mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-25 05:10:27 +01:00
Compare commits
1 Commits
issue_4360
...
issue_4360
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1be7daf19 |
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
|
||||
@@ -111,9 +112,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
}
|
||||
|
||||
// initiate agent config handler
|
||||
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
|
||||
Store: signoz.SQLStore,
|
||||
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
|
||||
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, llmCostFeature},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
package impllmpricingrule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
const LLMCostFeatureType agentConf.AgentFeatureType = "llm_pricing"
|
||||
|
||||
// LLMCostFeature implements agentConf.AgentFeature. It reads pricing rules
|
||||
// from the module and generates the signozllmpricing processor config for
|
||||
// deployment to OTel collectors via OpAMP.
|
||||
type LLMCostFeature struct {
|
||||
module llmpricingrule.Module
|
||||
}
|
||||
|
||||
func NewLLMCostFeature(module llmpricingrule.Module) *LLMCostFeature {
|
||||
return &LLMCostFeature{module: module}
|
||||
}
|
||||
|
||||
func (f *LLMCostFeature) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return LLMCostFeatureType
|
||||
}
|
||||
|
||||
func (f *LLMCostFeature) RecommendAgentConfig(
|
||||
orgId valuer.UUID,
|
||||
currentConfYaml []byte,
|
||||
configVersion *opamptypes.AgentConfigVersion,
|
||||
) ([]byte, string, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
rules, err := f.getEnabledRules(ctx, orgId)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
updatedConf, err := generateCollectorConfigWithLLMCost(currentConfYaml, rules)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(rules)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return updatedConf, string(serialized), nil
|
||||
}
|
||||
|
||||
// getEnabledRules fetches all enabled pricing rules for the given org.
|
||||
func (f *LLMCostFeature) getEnabledRules(ctx context.Context, orgId valuer.UUID) ([]*llmpricingruletypes.LLMPricingRule, error) {
|
||||
if f.module == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
rules, _, err := f.module.List(ctx, orgId, 0, 10000)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
enabled := make([]*llmpricingruletypes.LLMPricingRule, 0, len(rules))
|
||||
for _, r := range rules {
|
||||
if r.Enabled {
|
||||
enabled = append(enabled, r)
|
||||
}
|
||||
}
|
||||
return enabled, nil
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package impllmpricingrule
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const processorName = "signozllmpricing"
|
||||
|
||||
// buildProcessorConfig converts pricing rules into the signozllmpricing processor config.
|
||||
func buildProcessorConfig(rules []*llmpricingruletypes.LLMPricingRule) *llmpricingruletypes.LLMPricingRuleProcessorConfig {
|
||||
pricingRules := make([]llmpricingruletypes.LLMPricingRuleProcessor, 0, len(rules))
|
||||
for _, r := range rules {
|
||||
pricingRules = append(pricingRules, llmpricingruletypes.LLMPricingRuleProcessor{
|
||||
Name: r.Model,
|
||||
Pattern: r.ModelPattern,
|
||||
Cache: llmpricingruletypes.LLMPricingRuleProcessorCache{
|
||||
Mode: r.CacheMode.StringValue(),
|
||||
Read: r.CostCacheRead,
|
||||
Write: r.CostCacheWrite,
|
||||
},
|
||||
In: r.CostInput,
|
||||
Out: r.CostOutput,
|
||||
})
|
||||
}
|
||||
|
||||
return &llmpricingruletypes.LLMPricingRuleProcessorConfig{
|
||||
Attrs: llmpricingruletypes.LLMPricingRuleProcessorAttrs{
|
||||
Model: "gen_ai.request.model",
|
||||
In: "gen_ai.usage.input_tokens",
|
||||
Out: "gen_ai.usage.output_tokens",
|
||||
CacheRead: "gen_ai.usage.input_token_details.cached",
|
||||
CacheWrite: "gen_ai.usage.input_token_details.cache_creation",
|
||||
},
|
||||
DefaultPricing: llmpricingruletypes.LLMPricingRuleProcessorDefaultPricing{
|
||||
Unit: "per_million_tokens",
|
||||
Rules: pricingRules,
|
||||
},
|
||||
OutputAttrs: llmpricingruletypes.LLMPricingRuleProcessorOutputAttrs{
|
||||
In: "_signoz.gen_ai.cost_input",
|
||||
Out: "_signoz.gen_ai.cost_output",
|
||||
CacheRead: "_signoz.gen_ai.cost_cache_read",
|
||||
CacheWrite: "_signoz.gen_ai.cost_cache_write",
|
||||
Total: "_signoz.gen_ai.total_cost",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// generateCollectorConfigWithLLMCost injects (or replaces) the signozllmpricing
|
||||
// processor block in the collector YAML with one built from the given rules.
|
||||
// Pipeline wiring is handled by the collector's baseline config, not here.
|
||||
func generateCollectorConfigWithLLMCost(
|
||||
currentConfYaml []byte,
|
||||
rules []*llmpricingruletypes.LLMPricingRule,
|
||||
) ([]byte, error) {
|
||||
// Empty input: nothing to inject into. Pass through unchanged so we don't
|
||||
// turn it into "null\n" or fail on yaml.v3's EOF.
|
||||
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
|
||||
return currentConfYaml, nil
|
||||
}
|
||||
|
||||
var collectorConf map[string]any
|
||||
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal collector config: %w", err)
|
||||
}
|
||||
if collectorConf == nil {
|
||||
collectorConf = map[string]any{}
|
||||
}
|
||||
|
||||
processors := map[string]any{}
|
||||
if collectorConf["processors"] != nil {
|
||||
if p, ok := collectorConf["processors"].(map[string]any); ok {
|
||||
processors = p
|
||||
}
|
||||
}
|
||||
|
||||
procConfig := buildProcessorConfig(rules)
|
||||
configBytes, err := yaml.Marshal(procConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal llm cost processor config: %w", err)
|
||||
}
|
||||
var configMap any
|
||||
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
|
||||
return nil, fmt.Errorf("failed to re-unmarshal llm cost processor config: %w", err)
|
||||
}
|
||||
|
||||
processors[processorName] = configMap
|
||||
collectorConf["processors"] = processors
|
||||
|
||||
return yaml.Marshal(collectorConf)
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
package impllmpricingrule
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func makePricingRule(model string, patterns []string, cacheMode llmpricingruletypes.LLMPricingRuleCacheMode, costIn, costOut, cacheRead, cacheWrite float64) *llmpricingruletypes.LLMPricingRule {
|
||||
return &llmpricingruletypes.LLMPricingRule{
|
||||
Model: model,
|
||||
ModelPattern: patterns,
|
||||
Unit: llmpricingruletypes.UnitPerMillionTokens,
|
||||
CacheMode: cacheMode,
|
||||
CostInput: costIn,
|
||||
CostOutput: costOut,
|
||||
CostCacheRead: cacheRead,
|
||||
CostCacheWrite: cacheWrite,
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildProcessorConfig_EmptyRules(t *testing.T) {
|
||||
cfg := buildProcessorConfig(nil)
|
||||
|
||||
require.NotNil(t, cfg)
|
||||
assert.Empty(t, cfg.DefaultPricing.Rules)
|
||||
assert.Equal(t, "per_million_tokens", cfg.DefaultPricing.Unit)
|
||||
|
||||
assert.Equal(t, "gen_ai.request.model", cfg.Attrs.Model)
|
||||
assert.Equal(t, "gen_ai.usage.input_tokens", cfg.Attrs.In)
|
||||
assert.Equal(t, "gen_ai.usage.output_tokens", cfg.Attrs.Out)
|
||||
assert.Equal(t, "gen_ai.usage.input_token_details.cached", cfg.Attrs.CacheRead)
|
||||
assert.Equal(t, "gen_ai.usage.input_token_details.cache_creation", cfg.Attrs.CacheWrite)
|
||||
|
||||
assert.Equal(t, "_signoz.gen_ai.cost_input", cfg.OutputAttrs.In)
|
||||
assert.Equal(t, "_signoz.gen_ai.cost_output", cfg.OutputAttrs.Out)
|
||||
assert.Equal(t, "_signoz.gen_ai.cost_cache_read", cfg.OutputAttrs.CacheRead)
|
||||
assert.Equal(t, "_signoz.gen_ai.cost_cache_write", cfg.OutputAttrs.CacheWrite)
|
||||
assert.Equal(t, "_signoz.gen_ai.total_cost", cfg.OutputAttrs.Total)
|
||||
}
|
||||
|
||||
func TestBuildProcessorConfig_SingleRule(t *testing.T) {
|
||||
rules := []*llmpricingruletypes.LLMPricingRule{
|
||||
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(rules)
|
||||
|
||||
require.Len(t, cfg.DefaultPricing.Rules, 1)
|
||||
r := cfg.DefaultPricing.Rules[0]
|
||||
assert.Equal(t, "gpt-4o", r.Name)
|
||||
assert.Equal(t, []string{"gpt-4o*"}, r.Pattern)
|
||||
assert.Equal(t, 5.0, r.In)
|
||||
assert.Equal(t, 15.0, r.Out)
|
||||
assert.Equal(t, "subtract", r.Cache.Mode)
|
||||
assert.Equal(t, 2.5, r.Cache.Read)
|
||||
assert.Equal(t, 0.0, r.Cache.Write)
|
||||
}
|
||||
|
||||
func TestBuildProcessorConfig_MultipleRules_PreservesOrder(t *testing.T) {
|
||||
rules := []*llmpricingruletypes.LLMPricingRule{
|
||||
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
|
||||
makePricingRule("claude-sonnet", []string{"claude-sonnet-*", "claude-3-5-*"}, llmpricingruletypes.LLMPricingRuleCacheModeAdditive, 3.0, 15.0, 0.30, 3.75),
|
||||
makePricingRule("gemini", []string{"gemini-*"}, llmpricingruletypes.LLMPricingRuleCacheModeUnknown, 1.25, 5.0, 0, 0),
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(rules)
|
||||
require.Len(t, cfg.DefaultPricing.Rules, 3)
|
||||
|
||||
assert.Equal(t, "gpt-4o", cfg.DefaultPricing.Rules[0].Name)
|
||||
assert.Equal(t, []string{"gpt-4o*"}, cfg.DefaultPricing.Rules[0].Pattern)
|
||||
assert.Equal(t, "subtract", cfg.DefaultPricing.Rules[0].Cache.Mode)
|
||||
|
||||
assert.Equal(t, "claude-sonnet", cfg.DefaultPricing.Rules[1].Name)
|
||||
assert.Equal(t, []string{"claude-sonnet-*", "claude-3-5-*"}, cfg.DefaultPricing.Rules[1].Pattern)
|
||||
assert.Equal(t, "additive", cfg.DefaultPricing.Rules[1].Cache.Mode)
|
||||
assert.Equal(t, 0.30, cfg.DefaultPricing.Rules[1].Cache.Read)
|
||||
assert.Equal(t, 3.75, cfg.DefaultPricing.Rules[1].Cache.Write)
|
||||
|
||||
assert.Equal(t, "gemini", cfg.DefaultPricing.Rules[2].Name)
|
||||
assert.Equal(t, []string{"gemini-*"}, cfg.DefaultPricing.Rules[2].Pattern)
|
||||
assert.Equal(t, "unknown", cfg.DefaultPricing.Rules[2].Cache.Mode)
|
||||
assert.Equal(t, 1.25, cfg.DefaultPricing.Rules[2].In)
|
||||
assert.Equal(t, 5.0, cfg.DefaultPricing.Rules[2].Out)
|
||||
}
|
||||
|
||||
func TestBuildProcessorConfig_NilPattern(t *testing.T) {
|
||||
rules := []*llmpricingruletypes.LLMPricingRule{
|
||||
makePricingRule("gpt-4o", nil, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(rules)
|
||||
require.Len(t, cfg.DefaultPricing.Rules, 1)
|
||||
assert.Nil(t, cfg.DefaultPricing.Rules[0].Pattern)
|
||||
}
|
||||
|
||||
func TestGenerateCollectorConfig_NoRulesStillInjectsProcessor(t *testing.T) {
|
||||
// We deploy the processor even with zero rules so rules can be added
|
||||
// later (by a user or by Zeus) without any config-shape change.
|
||||
// Pipeline wiring is handled by the collector's baseline config.
|
||||
in := []byte(`
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
processors:
|
||||
batch: {}
|
||||
exporters:
|
||||
otlp:
|
||||
endpoint: localhost:4317
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [batch, signozllmpricing]
|
||||
exporters: [otlp]
|
||||
`)
|
||||
|
||||
out, err := generateCollectorConfigWithLLMCost(in, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var conf map[string]any
|
||||
require.NoError(t, yaml.Unmarshal(out, &conf))
|
||||
|
||||
processors := conf["processors"].(map[string]any)
|
||||
require.Contains(t, processors, processorName, "processor must be present even with zero rules")
|
||||
|
||||
procCfg := processors[processorName].(map[string]any)
|
||||
pricing := procCfg["default_pricing"].(map[string]any)
|
||||
if rules, ok := pricing["rules"].([]any); ok {
|
||||
assert.Empty(t, rules, "rules list must be empty when no pricing rules configured")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateCollectorConfig_EmptyInput(t *testing.T) {
|
||||
// yaml.v3 returns an EOF error on empty/whitespace input; ensure the
|
||||
// generator passes it through unchanged instead.
|
||||
rules := []*llmpricingruletypes.LLMPricingRule{
|
||||
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
|
||||
}
|
||||
|
||||
for _, in := range [][]byte{nil, {}, []byte(" \n"), []byte("\t\t")} {
|
||||
out, err := generateCollectorConfigWithLLMCost(in, rules)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, in, out)
|
||||
|
||||
out, err = generateCollectorConfigWithLLMCost(in, nil)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, in, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildProcessorConfig_ZeroCosts(t *testing.T) {
|
||||
rules := []*llmpricingruletypes.LLMPricingRule{
|
||||
makePricingRule("free-model", []string{"free-*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 0, 0, 0, 0),
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(rules)
|
||||
require.Len(t, cfg.DefaultPricing.Rules, 1)
|
||||
r := cfg.DefaultPricing.Rules[0]
|
||||
assert.Equal(t, 0.0, r.In)
|
||||
assert.Equal(t, 0.0, r.Out)
|
||||
assert.Equal(t, 0.0, r.Cache.Read)
|
||||
assert.Equal(t, 0.0, r.Cache.Write)
|
||||
}
|
||||
127
pkg/modules/llmpricingrule/impllmpricingrule/module.go
Normal file
127
pkg/modules/llmpricingrule/impllmpricingrule/module.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package impllmpricingrule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
store llmpricingruletypes.Store
|
||||
}
|
||||
|
||||
func NewModule(store llmpricingruletypes.Store) llmpricingrule.Module {
|
||||
return &module{store: store}
|
||||
}
|
||||
|
||||
func (m *module) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
|
||||
storables, total, err := m.store.List(ctx, orgID, offset, limit)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
rules := make([]*llmpricingruletypes.LLMPricingRule, len(storables))
|
||||
for i, s := range storables {
|
||||
rules[i] = llmpricingruletypes.NewLLMPricingRuleFromStorable(s)
|
||||
}
|
||||
return rules, total, nil
|
||||
}
|
||||
|
||||
func (m *module) Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
|
||||
s, err := m.store.Get(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return llmpricingruletypes.NewLLMPricingRuleFromStorable(s), nil
|
||||
}
|
||||
|
||||
// Update applies a batch of pricing rule changes:
|
||||
// - ID set → match by id, overwrite fields.
|
||||
// - SourceID set → match by source_id; if found overwrite, else insert.
|
||||
// - neither set → insert a new user-created row (is_override = true).
|
||||
//
|
||||
// When UpdatableLLMPricingRule.IsOverride is nil AND the matched row has
|
||||
// is_override = true, the row is fully preserved — only synced_at is stamped.
|
||||
func (m *module) Update(ctx context.Context, orgID valuer.UUID, userEmail string, rules []llmpricingruletypes.UpdatableLLMPricingRule) error {
|
||||
now := time.Now()
|
||||
|
||||
for _, r := range rules {
|
||||
existing, err := m.findExisting(ctx, orgID, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if existing == nil {
|
||||
if err := m.store.Create(ctx, llmpricingruletypes.NewStorablePricingRuleFromUpdatable(orgID, userEmail, now, r)); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if r.IsOverride == nil && existing.IsOverride {
|
||||
existing.SyncedAt = &now
|
||||
if err := m.store.Update(ctx, existing); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
applyUpdate(existing, userEmail, now, r)
|
||||
if err := m.store.Update(ctx, existing); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) Delete(ctx context.Context, orgID, id valuer.UUID) error {
|
||||
if err := m.store.Delete(ctx, orgID, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) findExisting(ctx context.Context, orgID valuer.UUID, r llmpricingruletypes.UpdatableLLMPricingRule) (*llmpricingruletypes.StorableLLMPricingRule, error) {
|
||||
switch {
|
||||
case r.ID != nil:
|
||||
return m.store.Get(ctx, orgID, *r.ID)
|
||||
case r.SourceID != nil:
|
||||
s, err := m.store.GetBySourceID(ctx, orgID, *r.SourceID)
|
||||
if err != nil {
|
||||
if errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func applyUpdate(existing *llmpricingruletypes.StorableLLMPricingRule, userEmail string, now time.Time, r llmpricingruletypes.UpdatableLLMPricingRule) {
|
||||
existing.Model = r.Model
|
||||
existing.ModelPattern = r.ModelPattern
|
||||
existing.Unit = r.Unit
|
||||
existing.CacheMode = r.CacheMode
|
||||
existing.CostInput = r.CostInput
|
||||
existing.CostOutput = r.CostOutput
|
||||
existing.CostCacheRead = r.CostCacheRead
|
||||
existing.CostCacheWrite = r.CostCacheWrite
|
||||
if r.IsOverride != nil {
|
||||
existing.IsOverride = *r.IsOverride
|
||||
}
|
||||
existing.Enabled = r.Enabled
|
||||
existing.SyncedAt = &now
|
||||
existing.UpdatedAt = now
|
||||
existing.UpdatedBy = userEmail
|
||||
}
|
||||
137
pkg/modules/llmpricingrule/impllmpricingrule/store.go
Normal file
137
pkg/modules/llmpricingrule/impllmpricingrule/store.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package impllmpricingrule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type store struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStore(sqlstore sqlstore.SQLStore) llmpricingruletypes.Store {
|
||||
return &store{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (s *store) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.StorableLLMPricingRule, int, error) {
|
||||
rules := make([]*llmpricingruletypes.StorableLLMPricingRule, 0)
|
||||
|
||||
count, err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&rules).
|
||||
Where("org_id = ?", orgID).
|
||||
Order("created_at DESC").
|
||||
Offset(offset).
|
||||
Limit(limit).
|
||||
ScanAndCount(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return rules, count, nil
|
||||
}
|
||||
|
||||
func (s *store) Get(ctx context.Context, orgID, id valuer.UUID) (*llmpricingruletypes.StorableLLMPricingRule, error) {
|
||||
rule := new(llmpricingruletypes.StorableLLMPricingRule)
|
||||
|
||||
err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(rule).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (s *store) GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*llmpricingruletypes.StorableLLMPricingRule, error) {
|
||||
rule := new(llmpricingruletypes.StorableLLMPricingRule)
|
||||
|
||||
err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(rule).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("source_id = ?", sourceID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule with source_id %s not found", sourceID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (s *store) Create(ctx context.Context, rule *llmpricingruletypes.StorableLLMPricingRule) error {
|
||||
_, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(rule).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) Update(ctx context.Context, rule *llmpricingruletypes.StorableLLMPricingRule) error {
|
||||
res, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewUpdate().
|
||||
Model(rule).
|
||||
Where("org_id = ?", rule.OrgID).
|
||||
Where("id = ?", rule.ID).
|
||||
ExcludeColumn("id", "org_id", "created_at", "created_by").
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", rule.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) Delete(ctx context.Context, orgID, id valuer.UUID) error {
|
||||
res, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewDelete().
|
||||
Model((*llmpricingruletypes.StorableLLMPricingRule)(nil)).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
@@ -129,11 +130,14 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
|
||||
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
|
||||
|
||||
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(
|
||||
&agentConf.ManagerOptions{
|
||||
Store: signoz.SQLStore,
|
||||
AgentFeatures: []agentConf.AgentFeature{
|
||||
logParsingPipelineController,
|
||||
llmCostFeature,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -116,6 +116,6 @@ func NewHandlers(
|
||||
CloudIntegrationHandler: implcloudintegration.NewHandler(modules.CloudIntegration),
|
||||
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
|
||||
RulerHandler: signozruler.NewHandler(rulerService),
|
||||
LLMPricingRuleHandler: impllmpricingrule.NewHandler(nil, providerSettings),
|
||||
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule, providerSettings),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
|
||||
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
|
||||
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -76,6 +78,7 @@ type Modules struct {
|
||||
ServiceAccount serviceaccount.Module
|
||||
CloudIntegration cloudintegration.Module
|
||||
RuleStateHistory rulestatehistory.Module
|
||||
LLMPricingRule llmpricingrule.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -127,5 +130,6 @@ func NewModules(
|
||||
ServiceAccount: serviceAccount,
|
||||
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
|
||||
CloudIntegration: cloudIntegrationModule,
|
||||
LLMPricingRule: impllmpricingrule.NewModule(impllmpricingrule.NewStore(sqlstore)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +195,7 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
|
||||
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
|
||||
sqlmigration.NewAddLLMPricingRulesFactory(sqlstore, sqlschema),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
99
pkg/sqlmigration/078_add_llm_pricing_rules.go
Normal file
99
pkg/sqlmigration/078_add_llm_pricing_rules.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type addLLMPricingRules struct {
|
||||
sqlschema sqlschema.SQLSchema
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewAddLLMPricingRulesFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_llm_pricing_rule"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||
return &addLLMPricingRules{
|
||||
sqlschema: sqlschema,
|
||||
sqlstore: sqlstore,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *addLLMPricingRules) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addLLMPricingRules) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
sqls := [][]byte{}
|
||||
|
||||
tableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "llm_pricing_rule",
|
||||
Columns: []*sqlschema.Column{
|
||||
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "source_id", DataType: sqlschema.DataTypeText, Nullable: true},
|
||||
{Name: "model", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "model_pattern", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "unit", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "cache_mode", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "cost_input", DataType: sqlschema.DataTypeNumeric, Nullable: false},
|
||||
{Name: "cost_output", DataType: sqlschema.DataTypeNumeric, Nullable: false},
|
||||
{Name: "cost_cache_read", DataType: sqlschema.DataTypeNumeric, Nullable: false},
|
||||
{Name: "cost_cache_write", DataType: sqlschema.DataTypeNumeric, Nullable: false},
|
||||
{Name: "is_override", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "false"},
|
||||
{Name: "synced_at", DataType: sqlschema.DataTypeTimestamp, Nullable: true},
|
||||
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
|
||||
},
|
||||
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
|
||||
ColumnNames: []sqlschema.ColumnName{"id"},
|
||||
},
|
||||
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
|
||||
{
|
||||
ReferencingColumnName: sqlschema.ColumnName("org_id"),
|
||||
ReferencedTableName: sqlschema.TableName("organizations"),
|
||||
ReferencedColumnName: sqlschema.ColumnName("id"),
|
||||
},
|
||||
},
|
||||
})
|
||||
sqls = append(sqls, tableSQLs...)
|
||||
|
||||
indexSQLs := migration.sqlschema.Operator().CreateIndex(
|
||||
&sqlschema.UniqueIndex{
|
||||
TableName: "llm_pricing_rule",
|
||||
ColumnNames: []sqlschema.ColumnName{"org_id", "source_id"},
|
||||
})
|
||||
sqls = append(sqls, indexSQLs...)
|
||||
|
||||
for _, sql := range sqls {
|
||||
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *addLLMPricingRules) Down(context.Context, *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
49
pkg/types/llmpricingruletypes/processor_config.go
Normal file
49
pkg/types/llmpricingruletypes/processor_config.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package llmpricingruletypes
|
||||
|
||||
// LLMPricingRuleProcessorConfig is the top-level config for the signozllmpricing
|
||||
// OTel processor that gets deployed to collectors via OpAMP.
|
||||
type LLMPricingRuleProcessorConfig struct {
|
||||
Attrs LLMPricingRuleProcessorAttrs `yaml:"attrs" json:"attrs"`
|
||||
DefaultPricing LLMPricingRuleProcessorDefaultPricing `yaml:"default_pricing" json:"default_pricing"`
|
||||
OutputAttrs LLMPricingRuleProcessorOutputAttrs `yaml:"output_attrs" json:"output_attrs"`
|
||||
}
|
||||
|
||||
// LLMCostAttrs maps span attribute names to the processor's input fields.
|
||||
type LLMPricingRuleProcessorAttrs struct {
|
||||
Model string `yaml:"model" json:"model"`
|
||||
In string `yaml:"in" json:"in"`
|
||||
Out string `yaml:"out" json:"out"`
|
||||
CacheRead string `yaml:"cache_read" json:"cache_read"`
|
||||
CacheWrite string `yaml:"cache_write" json:"cache_write"`
|
||||
}
|
||||
|
||||
// LLMPricingRuleDefaultPricing holds the pricing unit and the list of model-specific rules.
|
||||
type LLMPricingRuleProcessorDefaultPricing struct {
|
||||
Unit string `yaml:"unit" json:"unit"`
|
||||
Rules []LLMPricingRuleProcessor `yaml:"rules" json:"rules"`
|
||||
}
|
||||
|
||||
// LLMPricingRuleRule is a single pricing rule inside the processor config.
|
||||
type LLMPricingRuleProcessor struct {
|
||||
Name string `yaml:"name" json:"name"`
|
||||
Pattern []string `yaml:"pattern" json:"pattern"`
|
||||
Cache LLMPricingRuleProcessorCache `yaml:"cache" json:"cache"`
|
||||
In float64 `yaml:"in" json:"in"`
|
||||
Out float64 `yaml:"out" json:"out"`
|
||||
}
|
||||
|
||||
// LLMPricingRuleCache describes how cached tokens are accounted for.
|
||||
type LLMPricingRuleProcessorCache struct {
|
||||
Mode string `yaml:"mode" json:"mode"`
|
||||
Read float64 `yaml:"read" json:"read"`
|
||||
Write float64 `yaml:"write" json:"write"`
|
||||
}
|
||||
|
||||
// LLMPricingRuleOutputAttrs maps the processor's computed cost fields to span attribute names.
|
||||
type LLMPricingRuleProcessorOutputAttrs struct {
|
||||
In string `yaml:"in" json:"in"`
|
||||
Out string `yaml:"out" json:"out"`
|
||||
CacheRead string `yaml:"cache_read" json:"cache_read"`
|
||||
CacheWrite string `yaml:"cache_write" json:"cache_write"`
|
||||
Total string `yaml:"total" json:"total"`
|
||||
}
|
||||
@@ -17,7 +17,7 @@ type StringSlice []string
|
||||
|
||||
// StorableLLMPricingRule is the bun/DB representation of an LLM pricing rule.
|
||||
type StorableLLMPricingRule struct {
|
||||
bun.BaseModel `bun:"table:llm_pricing_rules,alias:llm_pricing_rules"`
|
||||
bun.BaseModel `bun:"table:llm_pricing_rule,alias:llm_pricing_rule"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
@@ -65,3 +65,31 @@ func (s *StringSlice) Scan(src any) error {
|
||||
}
|
||||
return json.Unmarshal(raw, s)
|
||||
}
|
||||
|
||||
func NewStorablePricingRuleFromUpdatable(orgID valuer.UUID, userEmail string, now time.Time, r UpdatableLLMPricingRule) *StorableLLMPricingRule {
|
||||
isOverride := true
|
||||
if r.IsOverride != nil {
|
||||
isOverride = *r.IsOverride
|
||||
} else if r.SourceID != nil {
|
||||
isOverride = false
|
||||
}
|
||||
|
||||
return &StorableLLMPricingRule{
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
|
||||
UserAuditable: types.UserAuditable{CreatedBy: userEmail, UpdatedBy: userEmail},
|
||||
OrgID: orgID,
|
||||
SourceID: r.SourceID,
|
||||
Model: r.Model,
|
||||
ModelPattern: r.ModelPattern,
|
||||
Unit: r.Unit,
|
||||
CacheMode: r.CacheMode,
|
||||
CostInput: r.CostInput,
|
||||
CostOutput: r.CostOutput,
|
||||
CostCacheRead: r.CostCacheRead,
|
||||
CostCacheWrite: r.CostCacheWrite,
|
||||
IsOverride: isOverride,
|
||||
SyncedAt: &now,
|
||||
Enabled: r.Enabled,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user