Compare commits

..

39 Commits

Author SHA1 Message Date
Piyush Singariya
9176ef0589 Merge branch 'main' into feat/json-qb-tests 2026-04-10 19:59:46 +05:30
Piyush Singariya
5c2a338189 test: updated validation 2026-04-10 19:59:29 +05:30
Piyush Singariya
704bab23cf Merge branch 'main' into feat/json-qb-tests 2026-04-09 18:04:25 +05:30
Piyush Singariya
371da26b3c fix: test 2026-04-09 18:03:57 +05:30
Piyush Singariya
97fbfbdc13 fix: type ambiguity 2026-04-09 13:58:13 +05:30
Piyush Singariya
4b112988ef Merge branch 'main' into feat/json-qb-tests 2026-04-09 12:19:26 +05:30
Piyush Singariya
a47ecf3907 test: with higher migrator version 2026-04-09 12:16:31 +05:30
Piyush Singariya
e4a78cf556 fix: dynamically change insert stmt for body_v2 availability 2026-04-08 22:00:05 +05:30
Piyush Singariya
b6adecc294 fix: better validations 2026-04-08 19:29:28 +05:30
Piyush Singariya
40333a5fee Merge branch 'main' into feat/json-qb-tests 2026-04-08 19:08:34 +05:30
Piyush Singariya
4af6a9abae Merge branch 'main' into feat/json-qb-tests 2026-04-07 21:49:21 +05:30
Piyush Singariya
55e892dad3 fix: body tests ready 2026-04-07 16:22:26 +05:30
Piyush Singariya
181116308f fix: logs.py 2026-04-07 15:56:21 +05:30
Piyush Singariya
eaa678910b fix: better tests 2026-04-07 14:49:52 +05:30
Piyush Singariya
e994caeb02 chore: import tests from older pr 2026-04-07 13:43:16 +05:30
Piyush Singariya
10840f8495 ci: lint changes 2026-04-07 12:31:22 +05:30
Piyush Singariya
1fcd3adfc8 Merge branch 'main' into fix/array-json 2026-04-07 12:29:35 +05:30
Piyush Singariya
3e14b26b00 fix: negative operator check 2026-04-07 12:26:44 +05:30
Piyush Singariya
b30bfa6371 fix: better review for test file 2026-04-02 12:53:44 +05:30
Piyush Singariya
e7f4a04b36 Merge branch 'main' into fix/array-json 2026-04-01 15:44:01 +05:30
Piyush Singariya
0687634da3 fix: stringified integer value input 2026-04-01 15:25:35 +05:30
Piyush Singariya
7e7732243e fix: dynamic array tests 2026-04-01 12:54:26 +05:30
Piyush Singariya
2f952e402f feat: change filtering of dynamic arrays 2026-04-01 12:09:32 +05:30
Piyush Singariya
a12febca4a fix: array json element comparison 2026-04-01 10:34:55 +05:30
Piyush Singariya
cb71c9c3f7 Merge branch 'main' into fix/array-json 2026-03-31 15:42:09 +05:30
Piyush Singariya
1cd4ce6509 Merge branch 'main' into fix/array-json 2026-03-31 14:55:36 +05:30
Piyush Singariya
9299c8ab18 fix: indexed unit tests 2026-03-30 15:47:47 +05:30
Piyush Singariya
24749de269 fix: comment 2026-03-30 15:16:28 +05:30
Piyush Singariya
39098ec3f4 fix: unit tests 2026-03-30 15:12:17 +05:30
Piyush Singariya
fe554f5c94 fix: remove not used paths from testdata 2026-03-30 14:24:48 +05:30
Piyush Singariya
8a60a041a6 fix: unit tests 2026-03-30 14:14:49 +05:30
Piyush Singariya
541f19c34a fix: array type filtering from dynamic arrays 2026-03-30 12:59:31 +05:30
Piyush Singariya
010db03d6e fix: indexed tests passing 2026-03-30 12:24:26 +05:30
Piyush Singariya
5408acbd8c fix: primitive conditions working 2026-03-30 12:01:35 +05:30
Piyush Singariya
0de6c85f81 feat: align negative operators to include other logs 2026-03-28 10:30:11 +05:30
Piyush Singariya
69ec24fa05 test: fix unit tests 2026-03-27 15:12:49 +05:30
Piyush Singariya
539d732b65 fix: contextual path index usage 2026-03-27 14:44:51 +05:30
Piyush Singariya
843d5fb199 Merge branch 'main' into feat/json-index 2026-03-27 14:17:52 +05:30
Piyush Singariya
fabdfb8cc1 feat: enable JSON Path index 2026-03-27 14:07:37 +05:30
21 changed files with 1868 additions and 1211 deletions

View File

@@ -52,6 +52,7 @@ jobs:
- ingestionkeys
- rootuser
- serviceaccount
- querier_json_body
sqlstore-provider:
- postgres
- sqlite
@@ -61,7 +62,7 @@ jobs:
- 25.5.6
- 25.12.5
schema-migrator-version:
- v0.142.0
- v0.144.2
postgres-version:
- 15
if: |

View File

@@ -1,169 +1,5 @@
components:
schemas:
Aio11YtypesCacheMode:
enum:
- subtract
- additive
- unknown
nullable: true
type: string
Aio11YtypesGettablePricingRule:
properties:
cache_mode:
$ref: '#/components/schemas/Aio11YtypesCacheMode'
cost_cache_read:
format: double
type: number
cost_cache_write:
format: double
type: number
cost_input:
format: double
type: number
cost_output:
format: double
type: number
created_at:
format: date-time
type: string
created_by:
type: string
enabled:
type: boolean
id:
type: string
is_override:
type: boolean
model_name:
type: string
model_pattern:
items:
type: string
nullable: true
type: array
synced_at:
format: date-time
nullable: true
type: string
unit:
$ref: '#/components/schemas/Aio11YtypesUnit'
updated_at:
format: date-time
type: string
updated_by:
type: string
required:
- id
- model_name
- model_pattern
- unit
- cache_mode
- cost_input
- cost_output
- cost_cache_read
- cost_cache_write
- is_override
- enabled
- created_at
- updated_at
- created_by
- updated_by
type: object
Aio11YtypesListPricingRulesResponse:
properties:
items:
items:
$ref: '#/components/schemas/Aio11YtypesGettablePricingRule'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
total:
type: integer
required:
- items
- total
- offset
- limit
type: object
Aio11YtypesPostablePricingRule:
properties:
cache_mode:
$ref: '#/components/schemas/Aio11YtypesCacheMode'
cost_cache_read:
format: double
type: number
cost_cache_write:
format: double
type: number
cost_input:
format: double
type: number
cost_output:
format: double
type: number
enabled:
type: boolean
model_name:
type: string
model_pattern:
items:
type: string
nullable: true
type: array
unit:
$ref: '#/components/schemas/Aio11YtypesUnit'
required:
- model_name
- model_pattern
- unit
- cache_mode
- cost_input
- cost_output
- cost_cache_read
- cost_cache_write
- enabled
type: object
Aio11YtypesUnit:
enum:
- per_million_tokens
- per_token
nullable: true
type: string
Aio11YtypesUpdatablePricingRule:
properties:
cache_mode:
$ref: '#/components/schemas/Aio11YtypesCacheMode'
cost_cache_read:
nullable: true
type: number
cost_cache_write:
nullable: true
type: number
cost_input:
nullable: true
type: number
cost_output:
nullable: true
type: number
enabled:
nullable: true
type: boolean
is_override:
nullable: true
type: boolean
model_name:
nullable: true
type: string
model_pattern:
items:
type: string
type: array
unit:
$ref: '#/components/schemas/Aio11YtypesUnit'
type: object
AuthtypesAttributeMapping:
properties:
email:
@@ -3222,303 +3058,6 @@ info:
version: ""
openapi: 3.0.3
paths:
/api/v1/ai-o11y/pricing/rules:
get:
deprecated: false
description: Returns all LLM pricing rules for the authenticated org, with pagination.
operationId: ListPricingRules
parameters:
- in: query
name: offset
schema:
type: integer
- in: query
name: limit
schema:
type: integer
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/Aio11YtypesListPricingRulesResponse'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List pricing rules
tags:
- ai-o11y
post:
deprecated: false
description: Creates a new LLM pricing rule for the org. Always sets is_override
= true.
operationId: CreatePricingRule
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/Aio11YtypesPostablePricingRule'
responses:
"201":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/Aio11YtypesGettablePricingRule'
status:
type: string
required:
- status
- data
type: object
description: Created
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"409":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Conflict
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Create a pricing rule
tags:
- ai-o11y
/api/v1/ai-o11y/pricing/rules/{id}:
delete:
deprecated: false
description: Hard-deletes a pricing rule. If auto-synced, it will be recreated
on the next sync cycle.
operationId: DeletePricingRule
parameters:
- in: path
name: id
required: true
schema:
type: string
responses:
"204":
description: No Content
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Delete a pricing rule
tags:
- ai-o11y
get:
deprecated: false
description: Returns a single LLM pricing rule by ID.
operationId: GetPricingRule
parameters:
- in: path
name: id
required: true
schema:
type: string
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/Aio11YtypesGettablePricingRule'
status:
type: string
required:
- status
- data
type: object
description: OK
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get a pricing rule
tags:
- ai-o11y
put:
deprecated: false
description: Partially updates an existing pricing rule. Changing any cost field
sets is_override = true.
operationId: UpdatePricingRule
parameters:
- in: path
name: id
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/Aio11YtypesUpdatablePricingRule'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/Aio11YtypesGettablePricingRule'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Update a pricing rule
tags:
- ai-o11y
/api/v1/authz/check:
post:
deprecated: false

View File

@@ -1,115 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/aio11ypricingruletypes"
"github.com/gorilla/mux"
)
func (provider *provider) addAIO11yRoutes(router *mux.Router) error {
if err := router.Handle("/api/v1/ai-o11y/pricing/rules", handler.New(
provider.authZ.ViewAccess(provider.aiO11yPricingRuleHandler.List),
handler.OpenAPIDef{
ID: "ListPricingRules",
Tags: []string{"ai-o11y"},
Summary: "List pricing rules",
Description: "Returns all LLM pricing rules for the authenticated org, with pagination.",
Request: nil,
RequestContentType: "",
RequestQuery: new(aio11ypricingruletypes.ListPricingRulesQuery),
Response: new(aio11ypricingruletypes.ListPricingRulesResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/ai-o11y/pricing/rules", handler.New(
provider.authZ.AdminAccess(provider.aiO11yPricingRuleHandler.Create),
handler.OpenAPIDef{
ID: "CreatePricingRule",
Tags: []string{"ai-o11y"},
Summary: "Create a pricing rule",
Description: "Creates a new LLM pricing rule for the org. Always sets is_override = true.",
Request: new(aio11ypricingruletypes.PostablePricingRule),
RequestContentType: "application/json",
Response: new(aio11ypricingruletypes.GettablePricingRule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusCreated,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusConflict},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/ai-o11y/pricing/rules/{id}", handler.New(
provider.authZ.ViewAccess(provider.aiO11yPricingRuleHandler.Get),
handler.OpenAPIDef{
ID: "GetPricingRule",
Tags: []string{"ai-o11y"},
Summary: "Get a pricing rule",
Description: "Returns a single LLM pricing rule by ID.",
Request: nil,
RequestContentType: "",
Response: new(aio11ypricingruletypes.GettablePricingRule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/ai-o11y/pricing/rules/{id}", handler.New(
provider.authZ.AdminAccess(provider.aiO11yPricingRuleHandler.Update),
handler.OpenAPIDef{
ID: "UpdatePricingRule",
Tags: []string{"ai-o11y"},
Summary: "Update a pricing rule",
Description: "Partially updates an existing pricing rule. Changing any cost field sets is_override = true.",
Request: new(aio11ypricingruletypes.UpdatablePricingRule),
RequestContentType: "application/json",
Response: new(aio11ypricingruletypes.GettablePricingRule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPut).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/ai-o11y/pricing/rules/{id}", handler.New(
provider.authZ.AdminAccess(provider.aiO11yPricingRuleHandler.Delete),
handler.OpenAPIDef{
ID: "DeletePricingRule",
Tags: []string{"ai-o11y"},
Summary: "Delete a pricing rule",
Description: "Hard-deletes a pricing rule. If auto-synced, it will be recreated on the next sync cycle.",
Request: nil,
RequestContentType: "",
Response: nil,
ResponseContentType: "",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodDelete).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/modules/aio11ypricingrule"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -33,32 +32,31 @@ import (
)
type provider struct {
config apiserver.Config
settings factory.ScopedProviderSettings
router *mux.Router
authZ *middleware.AuthZ
orgHandler organization.Handler
userHandler user.Handler
sessionHandler session.Handler
authDomainHandler authdomain.Handler
preferenceHandler preference.Handler
globalHandler global.Handler
promoteHandler promote.Handler
flaggerHandler flagger.Handler
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
metricsExplorerHandler metricsexplorer.Handler
gatewayHandler gateway.Handler
fieldsHandler fields.Handler
authzHandler authz.Handler
rawDataExportHandler rawdataexport.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
serviceAccountHandler serviceaccount.Handler
factoryHandler factory.Handler
cloudIntegrationHandler cloudintegration.Handler
ruleStateHistoryHandler rulestatehistory.Handler
aiO11yPricingRuleHandler aio11ypricingrule.Handler
config apiserver.Config
settings factory.ScopedProviderSettings
router *mux.Router
authZ *middleware.AuthZ
orgHandler organization.Handler
userHandler user.Handler
sessionHandler session.Handler
authDomainHandler authdomain.Handler
preferenceHandler preference.Handler
globalHandler global.Handler
promoteHandler promote.Handler
flaggerHandler flagger.Handler
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
metricsExplorerHandler metricsexplorer.Handler
gatewayHandler gateway.Handler
fieldsHandler fields.Handler
authzHandler authz.Handler
rawDataExportHandler rawdataexport.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
serviceAccountHandler serviceaccount.Handler
factoryHandler factory.Handler
cloudIntegrationHandler cloudintegration.Handler
ruleStateHistoryHandler rulestatehistory.Handler
}
func NewFactory(
@@ -85,7 +83,6 @@ func NewFactory(
factoryHandler factory.Handler,
cloudIntegrationHandler cloudintegration.Handler,
ruleStateHistoryHandler rulestatehistory.Handler,
aiO11yPricingRuleHandler aio11ypricingrule.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -115,7 +112,6 @@ func NewFactory(
factoryHandler,
cloudIntegrationHandler,
ruleStateHistoryHandler,
aiO11yPricingRuleHandler,
)
})
}
@@ -147,37 +143,35 @@ func newProvider(
factoryHandler factory.Handler,
cloudIntegrationHandler cloudintegration.Handler,
ruleStateHistoryHandler rulestatehistory.Handler,
aiO11yPricingRuleHandler aio11ypricingrule.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
provider := &provider{
config: config,
settings: settings,
router: router,
orgHandler: orgHandler,
userHandler: userHandler,
sessionHandler: sessionHandler,
authDomainHandler: authDomainHandler,
preferenceHandler: preferenceHandler,
globalHandler: globalHandler,
promoteHandler: promoteHandler,
flaggerHandler: flaggerHandler,
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
metricsExplorerHandler: metricsExplorerHandler,
gatewayHandler: gatewayHandler,
fieldsHandler: fieldsHandler,
authzHandler: authzHandler,
rawDataExportHandler: rawDataExportHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
serviceAccountHandler: serviceAccountHandler,
factoryHandler: factoryHandler,
cloudIntegrationHandler: cloudIntegrationHandler,
ruleStateHistoryHandler: ruleStateHistoryHandler,
aiO11yPricingRuleHandler: aiO11yPricingRuleHandler,
config: config,
settings: settings,
router: router,
orgHandler: orgHandler,
userHandler: userHandler,
sessionHandler: sessionHandler,
authDomainHandler: authDomainHandler,
preferenceHandler: preferenceHandler,
globalHandler: globalHandler,
promoteHandler: promoteHandler,
flaggerHandler: flaggerHandler,
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
metricsExplorerHandler: metricsExplorerHandler,
gatewayHandler: gatewayHandler,
fieldsHandler: fieldsHandler,
authzHandler: authzHandler,
rawDataExportHandler: rawDataExportHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
serviceAccountHandler: serviceAccountHandler,
factoryHandler: factoryHandler,
cloudIntegrationHandler: cloudIntegrationHandler,
ruleStateHistoryHandler: ruleStateHistoryHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -278,10 +272,6 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addAIO11yRoutes(router); err != nil {
return err
}
return nil
}

View File

@@ -1,26 +0,0 @@
package aio11ypricingrule
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/aio11ypricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*aio11ypricingruletypes.PricingRule, int, error)
Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*aio11ypricingruletypes.PricingRule, error)
Create(ctx context.Context, orgID valuer.UUID, createdBy string, req *aio11ypricingruletypes.PostablePricingRule) (*aio11ypricingruletypes.PricingRule, error)
Update(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, req *aio11ypricingruletypes.UpdatablePricingRule) (*aio11ypricingruletypes.PricingRule, error)
Delete(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error
}
// Handler defines the HTTP handler interface for pricing rule endpoints.
type Handler interface {
List(rw http.ResponseWriter, r *http.Request)
Get(rw http.ResponseWriter, r *http.Request)
Create(rw http.ResponseWriter, r *http.Request)
Update(rw http.ResponseWriter, r *http.Request)
Delete(rw http.ResponseWriter, r *http.Request)
}

View File

@@ -1,226 +0,0 @@
package implaio11ypricingrule
import (
"context"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/binding"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/aio11ypricingrule"
"github.com/SigNoz/signoz/pkg/types/aio11ypricingruletypes"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
const maxLimit = 100
type handler struct {
module aio11ypricingrule.Module
providerSettings factory.ProviderSettings
}
func NewHandler(module aio11ypricingrule.Module, providerSettings factory.ProviderSettings) aio11ypricingrule.Handler {
return &handler{module: module, providerSettings: providerSettings}
}
// List handles GET /api/v1/ai-o11y/pricing/rules.
func (h *handler) List(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
var q aio11ypricingruletypes.ListPricingRulesQuery
if err := binding.Query.BindQuery(r.URL.Query(), &q); err != nil {
render.Error(rw, err)
return
}
if q.Limit <= 0 {
q.Limit = 20
} else if q.Limit > maxLimit {
q.Limit = maxLimit
}
if q.Offset < 0 {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, aio11ypricingruletypes.ErrCodePricingRuleInvalidInput, "offset must be a non-negative integer"))
return
}
rules, total, err := h.module.List(ctx, orgID, q.Offset, q.Limit)
if err != nil {
render.Error(rw, err)
return
}
items := make([]*aio11ypricingruletypes.GettablePricingRule, len(rules))
for i, r := range rules {
items[i] = aio11ypricingruletypes.NewGettablePricingRule(r)
}
render.Success(rw, http.StatusOK, &aio11ypricingruletypes.ListPricingRulesResponse{
Items: items,
Total: total,
Offset: q.Offset,
Limit: q.Limit,
})
}
// Get handles GET /api/v1/ai-o11y/pricing/rules/{id}.
func (h *handler) Get(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
id, err := ruleIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
rule, err := h.module.Get(ctx, orgID, id)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, aio11ypricingruletypes.NewGettablePricingRule(rule))
}
// Create handles POST /api/v1/ai-o11y/pricing/rules.
func (h *handler) Create(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
req := new(aio11ypricingruletypes.PostablePricingRule)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
rule, err := h.module.Create(ctx, orgID, claims.Email, req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusCreated, aio11ypricingruletypes.NewGettablePricingRule(rule))
}
// Update handles PUT /api/v1/ai-o11y/pricing/rules/{id}
func (h *handler) Update(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
id, err := ruleIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
req := new(aio11ypricingruletypes.UpdatablePricingRule)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
rule, err := h.module.Update(ctx, orgID, id, claims.Email, req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, aio11ypricingruletypes.NewGettablePricingRule(rule))
}
// Delete handles DELETE /api/v1/ai-o11y/pricing/rules/{id}
func (h *handler) Delete(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
id, err := ruleIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
if err := h.module.Delete(ctx, orgID, id); err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
// ruleIDFromPath extracts and validates the {id} path variable.
func ruleIDFromPath(r *http.Request) (valuer.UUID, error) {
raw := mux.Vars(r)["id"]
if raw == "" {
return valuer.UUID{}, errors.Newf(errors.TypeInvalidInput, aio11ypricingruletypes.ErrCodePricingRuleInvalidInput, "id is missing from the path")
}
id, err := valuer.NewUUID(raw)
if err != nil {
return valuer.UUID{}, errors.Wrapf(err, errors.TypeInvalidInput, aio11ypricingruletypes.ErrCodePricingRuleInvalidInput, "id is not a valid uuid")
}
return id, nil
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/modules/aio11ypricingrule"
"github.com/SigNoz/signoz/pkg/modules/apdex"
"github.com/SigNoz/signoz/pkg/modules/apdex/implapdex"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
@@ -43,27 +42,26 @@ import (
)
type Handlers struct {
SavedView savedview.Handler
Apdex apdex.Handler
Dashboard dashboard.Handler
QuickFilter quickfilter.Handler
TraceFunnel tracefunnel.Handler
RawDataExport rawdataexport.Handler
SpanPercentile spanpercentile.Handler
Services services.Handler
MetricsExplorer metricsexplorer.Handler
Global global.Handler
FlaggerHandler flagger.Handler
GatewayHandler gateway.Handler
Fields fields.Handler
AuthzHandler authz.Handler
ZeusHandler zeus.Handler
QuerierHandler querier.Handler
ServiceAccountHandler serviceaccount.Handler
RegistryHandler factory.Handler
CloudIntegrationHandler cloudintegration.Handler
RuleStateHistory rulestatehistory.Handler
AIO11yPricingRuleHandler aio11ypricingrule.Handler
SavedView savedview.Handler
Apdex apdex.Handler
Dashboard dashboard.Handler
QuickFilter quickfilter.Handler
TraceFunnel tracefunnel.Handler
RawDataExport rawdataexport.Handler
SpanPercentile spanpercentile.Handler
Services services.Handler
MetricsExplorer metricsexplorer.Handler
Global global.Handler
FlaggerHandler flagger.Handler
GatewayHandler gateway.Handler
Fields fields.Handler
AuthzHandler authz.Handler
ZeusHandler zeus.Handler
QuerierHandler querier.Handler
ServiceAccountHandler serviceaccount.Handler
RegistryHandler factory.Handler
CloudIntegrationHandler cloudintegration.Handler
RuleStateHistory rulestatehistory.Handler
}
func NewHandlers(

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/modules/aio11ypricingrule"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -70,7 +69,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ factory.Handler }{},
struct{ cloudintegration.Handler }{},
struct{ rulestatehistory.Handler }{},
struct{ aio11ypricingrule.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -3,6 +3,8 @@ package signoz
import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
"github.com/SigNoz/signoz/pkg/auditor"
"github.com/SigNoz/signoz/pkg/auditor/noopauditor"
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager/rulebasednotification"
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/analytics"
@@ -10,8 +12,6 @@ import (
"github.com/SigNoz/signoz/pkg/analytics/segmentanalytics"
"github.com/SigNoz/signoz/pkg/apiserver"
"github.com/SigNoz/signoz/pkg/apiserver/signozapiserver"
"github.com/SigNoz/signoz/pkg/auditor"
"github.com/SigNoz/signoz/pkg/auditor/noopauditor"
"github.com/SigNoz/signoz/pkg/authz"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
@@ -288,7 +288,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.RegistryHandler,
handlers.CloudIntegrationHandler,
handlers.RuleStateHistory,
handlers.AIO11yPricingRuleHandler,
),
)
}

View File

@@ -1,9 +0,0 @@
package aio11ypricingruletypes
import "github.com/SigNoz/signoz/pkg/errors"
var (
ErrCodePricingRuleNotFound = errors.MustNewCode("pricing_rule_not_found")
ErrCodePricingRuleAlreadyExists = errors.MustNewCode("pricing_rule_already_exists")
ErrCodePricingRuleInvalidInput = errors.MustNewCode("pricing_rule_invalid_input")
)

View File

@@ -1,91 +0,0 @@
package aio11ypricingruletypes
import "time"
// GettablePricingRule is the HTTP response representation of a pricing rule.
type GettablePricingRule struct {
ID string `json:"id" required:"true"`
Model string `json:"model_name" required:"true"`
ModelPattern []string `json:"model_pattern" required:"true"`
Unit Unit `json:"unit" required:"true"`
CacheMode CacheMode `json:"cache_mode" required:"true"`
CostInput float64 `json:"cost_input" required:"true"`
CostOutput float64 `json:"cost_output" required:"true"`
CostCacheRead float64 `json:"cost_cache_read" required:"true"`
CostCacheWrite float64 `json:"cost_cache_write" required:"true"`
IsOverride bool `json:"is_override" required:"true"`
SyncedAt *time.Time `json:"synced_at,omitempty"`
Enabled bool `json:"enabled" required:"true"`
CreatedAt time.Time `json:"created_at" required:"true"`
UpdatedAt time.Time `json:"updated_at" required:"true"`
CreatedBy string `json:"created_by" required:"true"`
UpdatedBy string `json:"updated_by" required:"true"`
}
// NewGettablePricingRule converts a domain PricingRule to a GettablePricingRule.
func NewGettablePricingRule(r *PricingRule) *GettablePricingRule {
pattern := make([]string, len(r.ModelPattern))
copy(pattern, r.ModelPattern)
return &GettablePricingRule{
ID: r.ID,
Model: r.Model,
ModelPattern: pattern,
Unit: r.Unit,
CacheMode: r.CacheMode,
CostInput: r.CostInput,
CostOutput: r.CostOutput,
CostCacheRead: r.CostCacheRead,
CostCacheWrite: r.CostCacheWrite,
IsOverride: r.IsOverride,
SyncedAt: r.SyncedAt,
Enabled: r.Enabled,
CreatedAt: r.CreatedAt,
UpdatedAt: r.UpdatedAt,
CreatedBy: r.CreatedBy,
UpdatedBy: r.UpdatedBy,
}
}
// PostablePricingRule is the HTTP request body for creating a pricing rule.
// All fields are required.
type PostablePricingRule struct {
Model string `json:"model_name" required:"true"`
ModelPattern []string `json:"model_pattern" required:"true"`
Unit Unit `json:"unit" required:"true"`
CacheMode CacheMode `json:"cache_mode" required:"true"`
CostInput float64 `json:"cost_input" required:"true"`
CostOutput float64 `json:"cost_output" required:"true"`
CostCacheRead float64 `json:"cost_cache_read" required:"true"`
CostCacheWrite float64 `json:"cost_cache_write" required:"true"`
Enabled bool `json:"enabled" required:"true"`
}
// UpdatablePricingRule is the HTTP request body for updating a pricing rule.
// All fields are optional; only non-nil fields are applied.
type UpdatablePricingRule struct {
Model *string `json:"model_name,omitempty"`
ModelPattern []string `json:"model_pattern,omitempty"`
Unit *Unit `json:"unit,omitempty"`
CacheMode *CacheMode `json:"cache_mode,omitempty"`
CostInput *float64 `json:"cost_input,omitempty"`
CostOutput *float64 `json:"cost_output,omitempty"`
CostCacheRead *float64 `json:"cost_cache_read,omitempty"`
CostCacheWrite *float64 `json:"cost_cache_write,omitempty"`
IsOverride *bool `json:"is_override,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
}
// ListPricingRulesQuery holds the pagination parameters for listing pricing rules.
type ListPricingRulesQuery struct {
Offset int `query:"offset" json:"offset"`
Limit int `query:"limit" json:"limit"`
}
// ListPricingRulesResponse is the paginated response for listing pricing rules.
type ListPricingRulesResponse struct {
Items []*GettablePricingRule `json:"items" required:"true" nullable:"true"`
Total int `json:"total" required:"true"`
Offset int `json:"offset" required:"true"`
Limit int `json:"limit" required:"true"`
}

View File

@@ -1,80 +0,0 @@
package aio11ypricingruletypes
import (
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Unit string
const (
UnitPerMillionTokens Unit = "per_million_tokens"
)
func (Unit) Enum() []any {
return []any{UnitPerMillionTokens}
}
type CacheMode string
const (
// CacheModeSubtract: cached tokens are inside input_tokens (OpenAI-style).
CacheModeSubtract CacheMode = "subtract"
// CacheModeAdditive: cached tokens are reported separately (Anthropic-style).
CacheModeAdditive CacheMode = "additive"
// CacheModeUnknown: provider behaviour is unknown; falls back to subtract.
CacheModeUnknown CacheMode = "unknown"
)
func (CacheMode) Enum() []any {
return []any{CacheModeSubtract, CacheModeAdditive, CacheModeUnknown}
}
// PricingRule is the domain model for an LLM pricing rule.
// It has no serialisation concerns — use GettablePricingRule for HTTP responses.
type PricingRule struct {
types.TimeAuditable
types.UserAuditable
ID string
OrgID valuer.UUID
Model string
ModelPattern []string
Unit Unit
CacheMode CacheMode
CostInput float64
CostOutput float64
CostCacheRead float64
CostCacheWrite float64
// IsOverride marks that cost fields were manually edited.
// When true the sync job skips this rule.
IsOverride bool
SyncedAt *time.Time
Enabled bool
}
// NewPricingRuleFromStorable converts a StorablePricingRule to a PricingRule.
func NewPricingRuleFromStorable(s *StorablePricingRule) *PricingRule {
pattern := make([]string, len(s.ModelPattern))
copy(pattern, s.ModelPattern)
return &PricingRule{
TimeAuditable: s.TimeAuditable,
UserAuditable: s.UserAuditable,
ID: s.ID.StringValue(),
OrgID: s.OrgID,
Model: s.Model,
ModelPattern: pattern,
Unit: s.Unit,
CacheMode: s.CacheMode,
CostInput: s.CostInput,
CostOutput: s.CostOutput,
CostCacheRead: s.CostCacheRead,
CostCacheWrite: s.CostCacheWrite,
IsOverride: s.IsOverride,
SyncedAt: s.SyncedAt,
Enabled: s.Enabled,
}
}

View File

@@ -1,70 +0,0 @@
package aio11ypricingruletypes
import (
"database/sql/driver"
"encoding/json"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
// StringSlice is a []string that is stored as a JSON text column.
// It is compatible with both SQLite and PostgreSQL.
type StringSlice []string
func (s StringSlice) Value() (driver.Value, error) {
if s == nil {
return "[]", nil
}
b, err := json.Marshal(s)
if err != nil {
return nil, err
}
return string(b), nil
}
func (s *StringSlice) Scan(src any) error {
var raw []byte
switch v := src.(type) {
case string:
raw = []byte(v)
case []byte:
raw = v
case nil:
*s = nil
return nil
default:
return fmt.Errorf("aio11ypricingruletypes: cannot scan %T into StringSlice", src)
}
return json.Unmarshal(raw, s)
}
// StorablePricingRule is the bun/DB representation of an LLM pricing rule.
type StorablePricingRule struct {
bun.BaseModel `bun:"table:llm_pricing_rules,alias:llm_pricing_rules"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
OrgID valuer.UUID `bun:"org_id,type:text,notnull"`
Model string `bun:"model,type:text,notnull"`
ModelPattern StringSlice `bun:"model_pattern,type:text,notnull"`
Unit Unit `bun:"unit,type:text,notnull"`
CacheMode CacheMode `bun:"cache_mode,type:text,notnull"`
CostInput float64 `bun:"cost_input,notnull"`
CostOutput float64 `bun:"cost_output,notnull"`
CostCacheRead float64 `bun:"cost_cache_read,notnull"`
CostCacheWrite float64 `bun:"cost_cache_write,notnull"`
// IsOverride marks that cost fields were manually edited.
// When true the sync job skips this rule.
IsOverride bool `bun:"is_override,notnull,default:false"`
// SourceConfig holds the last upstream pricing snapshot; used to revert
// when IsOverride is cleared.
SourceConfig string `bun:"source_config,type:text,notnull,default:'{}'"`
SyncedAt *time.Time `bun:"synced_at"`
Enabled bool `bun:"enabled,notnull,default:true"`
}

View File

@@ -1,19 +0,0 @@
package aio11ypricingruletypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*StorablePricingRule, int, error)
Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*StorablePricingRule, error)
Create(ctx context.Context, rule *StorablePricingRule) error
Update(ctx context.Context, rule *StorablePricingRule) error
Delete(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error
}

View File

@@ -23,6 +23,7 @@ pytest_plugins = [
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
"fixtures.jsontypeexporter",
]

View File

@@ -0,0 +1,437 @@
"""
Simpler version of jsontypeexporter for test fixtures.
This exports JSON type metadata to the path_types table by parsing JSON bodies
and extracting all paths with their types, similar to how the real jsontypeexporter works.
"""
import datetime
import json
from abc import ABC
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
List,
Optional,
Set,
Union,
)
import numpy as np
import pytest
from fixtures import types
if TYPE_CHECKING:
pass
class JSONPathType(ABC):
"""Represents a JSON path with its type information"""
path: str
type: str
last_seen: np.uint64
def __init__(
self,
path: str,
type: str, # pylint: disable=redefined-builtin
last_seen: Optional[datetime.datetime] = None,
) -> None:
self.path = path
self.type = type
if last_seen is None:
last_seen = datetime.datetime.now()
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
def np_arr(self) -> np.array:
"""Return path type data as numpy array for database insertion"""
return np.array([self.path, self.type, self.last_seen])
# Constants matching jsontypeexporter
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
"""
Infer array type from a list of pre-classified type strings.
Matches jsontypeexporter's inferArrayMask logic (v0.144.2+).
Type strings are: "JSON", "String", "Bool", "Float64", "Int64"
SuperTyping rules (matching Go inferArrayMask):
- JSON alone → Array(JSON)
- JSON + any primitive → Array(Dynamic)
- String alone → Array(Nullable(String)); String + other → Array(Dynamic)
- Float64 wins over Int64 and Bool
- Int64 wins over Bool
- Bool alone → Array(Nullable(Bool))
"""
if len(types) == 0:
return None
unique = set(types)
has_json = "JSON" in unique
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
if has_json:
if not has_primitive:
return "Array(JSON)"
return "Array(Dynamic)"
# ---- Primitive Type Resolution (Float > Int > Bool) ----
if "String" in unique:
if len(unique) > 1:
return "Array(Dynamic)"
return "Array(Nullable(String))"
if "Float64" in unique:
return "Array(Nullable(Float64))"
if "Int64" in unique:
return "Array(Nullable(Int64))"
if "Bool" in unique:
return "Array(Nullable(Bool))"
return "Array(Dynamic)"
def _infer_array_type(elements: List[Any]) -> Optional[str]:
"""
Infer array type from raw Python list elements.
Classifies each element then delegates to _infer_array_type_from_type_strings.
"""
if len(elements) == 0:
return None
types = []
for elem in elements:
if elem is None:
continue
if isinstance(elem, dict):
types.append("JSON")
elif isinstance(elem, str):
types.append("String")
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
types.append("Bool")
elif isinstance(elem, float):
types.append("Float64")
elif isinstance(elem, int):
types.append("Int64")
return _infer_array_type_from_type_strings(types)
def _python_type_to_clickhouse_type(value: Any) -> str:
"""
Convert Python type to ClickHouse JSON type string.
Maps Python types to ClickHouse JSON data types.
"""
if value is None:
return "String" # Default for null values
if isinstance(value, bool):
return "Bool"
elif isinstance(value, int):
return "Int64"
elif isinstance(value, float):
return "Float64"
elif isinstance(value, str):
return "String"
elif isinstance(value, list):
# Use the sophisticated array type inference
array_type = _infer_array_type(value)
return array_type if array_type else "Array(Dynamic)"
elif isinstance(value, dict):
return "JSON"
else:
return "String" # Default fallback
def _extract_json_paths(
obj: Any,
current_path: str = "",
path_types: Optional[Dict[str, Set[str]]] = None,
level: int = 0,
) -> Dict[str, Set[str]]:
"""
Recursively extract all paths and their types from a JSON object.
Matches jsontypeexporter's analyzePValue logic.
Args:
obj: The JSON object to traverse
current_path: Current path being built (e.g., "user.name")
path_types: Dictionary mapping paths to sets of types found
level: Current nesting level (for depth limiting)
Returns:
Dictionary mapping paths to sets of type strings
"""
if path_types is None:
path_types = {}
if obj is None:
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add("String") # Null defaults to String
return path_types
if isinstance(obj, dict):
# For objects, add the object itself and recurse into keys
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add("JSON")
for key, value in obj.items():
# Build the path for this key
if current_path:
new_path = f"{current_path}.{key}"
else:
new_path = key
# Recurse into the value
_extract_json_paths(value, new_path, path_types, level + 1)
elif isinstance(obj, list):
# Skip empty arrays
if len(obj) == 0:
return path_types
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
types = []
for item in obj:
if isinstance(item, dict):
# When traversing into array element objects, use ArraySuffix ([])
# This matches: prefix+ArraySuffix in the Go code
# Example: if current_path is "education", we use "education[]" to traverse into objects
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
for key, value in item.items():
if array_prefix:
# Use array separator: education[].name
array_path = f"{array_prefix}.{key}"
else:
array_path = key
# Recurse without increasing level (matching Go behavior)
_extract_json_paths(value, array_path, path_types, level)
types.append("JSON")
elif isinstance(item, list):
# Arrays inside arrays are not supported - skip the whole path
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
return path_types
elif isinstance(item, str):
types.append("String")
elif isinstance(item, bool):
types.append("Bool")
elif isinstance(item, float):
types.append("Float64")
elif isinstance(item, int):
types.append("Int64")
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
if len(types) > 0:
array_type = _infer_array_type_from_type_strings(types)
if array_type and current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add(array_type)
else:
# Primitive value (string, number, bool)
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
obj_type = _python_type_to_clickhouse_type(obj)
path_types[current_path].add(obj_type)
return path_types
def _parse_json_bodies_and_extract_paths(
json_bodies: List[str],
timestamp: Optional[datetime.datetime] = None,
) -> List[JSONPathType]:
"""
Parse JSON bodies and extract all paths with their types.
This mimics the behavior of jsontypeexporter.
Args:
json_bodies: List of JSON body strings to parse
timestamp: Timestamp to use for last_seen (defaults to now)
Returns:
List of JSONPathType objects with all discovered paths and types
"""
if timestamp is None:
timestamp = datetime.datetime.now()
# Aggregate all paths and their types across all JSON bodies
all_path_types: Dict[str, Set[str]] = {}
for json_body in json_bodies:
try:
parsed = json.loads(json_body)
_extract_json_paths(parsed, "", all_path_types, level=0)
except (json.JSONDecodeError, TypeError):
# Skip invalid JSON
continue
# Convert to list of JSONPathType objects
# Each path can have multiple types, so we create one JSONPathType per type
path_type_objects: List[JSONPathType] = []
for path, types_set in all_path_types.items():
for type_str in types_set:
path_type_objects.append(
JSONPathType(path=path, type=type_str, last_seen=timestamp)
)
return path_type_objects
@pytest.fixture(name="export_json_types", scope="function")
def export_json_types(
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest, # To access migrator fixture
) -> Generator[
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
]:
"""
Fixture for exporting JSON type metadata to the path_types table.
This is a simpler version of jsontypeexporter for test fixtures.
The function can accept:
1. List of JSONPathType objects (manual specification)
2. List of JSON body strings (auto-extract paths)
3. List of Logs objects (extract from body_json field)
Usage examples:
# Manual specification
export_json_types([
JSONPathType(path="user.name", type="String"),
JSONPathType(path="user.age", type="Int64"),
])
# Auto-extract from JSON strings
export_json_types([
'{"user": {"name": "alice", "age": 25}}',
'{"user": {"name": "bob", "age": 30}}',
])
# Auto-extract from Logs objects
export_json_types(logs_list)
"""
# Ensure migrator has run to create the table
try:
request.getfixturevalue("migrator")
except Exception:
# If migrator fixture is not available, that's okay - table might already exist
pass
def _export_json_types(
data: Union[
List[JSONPathType], List[str], List[Any]
], # List[Logs] but avoiding circular import
) -> None:
"""
Export JSON type metadata to signoz_metadata.distributed_json_path_types table.
This table stores path and type information for body JSON fields.
"""
path_types: List[JSONPathType] = []
if len(data) == 0:
return
# Determine input type and convert to JSONPathType list
first_item = data[0]
if isinstance(first_item, JSONPathType):
# Already JSONPathType objects
path_types = data # type: ignore
elif isinstance(first_item, str):
# List of JSON strings - parse and extract paths
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
else:
# Assume it's a list of Logs objects - extract body_v2
json_bodies: List[str] = []
for log in data: # type: ignore
# Try to get body_v2 attribute
if hasattr(log, "body_v2") and log.body_v2:
json_bodies.append(log.body_v2)
elif hasattr(log, "body") and log.body:
# Fallback to body if body_v2 not available
try:
# Try to parse as JSON
json.loads(log.body)
json_bodies.append(log.body)
except (json.JSONDecodeError, TypeError):
pass
if json_bodies:
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
if len(path_types) == 0:
return
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_json_path_types",
data=[path_type.np_arr() for path_type in path_types],
column_names=[
"path",
"type",
"last_seen",
],
)
yield _export_json_types
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metadata.json_path_types ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
@pytest.fixture(name="export_promoted_paths", scope="function")
def export_promoted_paths(
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest, # To access migrator fixture
) -> Generator[Callable[[List[str]], None], Any, None]:
"""
Fixture for exporting promoted JSON paths to the promoted paths table.
"""
# Ensure migrator has run to create the table
try:
request.getfixturevalue("migrator")
except Exception:
# If migrator fixture is not available, that's okay - table might already exist
pass
def _export_promoted_paths(paths: List[str]) -> None:
if len(paths) == 0:
return
now_ms = int(datetime.datetime.now().timestamp() * 1000)
rows = [(path, now_ms) for path in paths]
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_json_promoted_paths",
data=rows,
column_names=[
"path",
"created_at",
],
)
yield _export_promoted_paths
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metadata.json_promoted_paths ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)

View File

@@ -122,6 +122,8 @@ class Logs(ABC):
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
body_v2: Optional[str] = None,
body_promoted: Optional[str] = None,
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
@@ -167,6 +169,33 @@ class Logs(ABC):
# Set body
self.body = body
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
# ClickHouse accepts String input for JSON column
if body_v2 is not None:
self.body_v2 = body_v2
else:
# Try to parse body as JSON; if successful use it directly,
# otherwise wrap as {"message": body} matching the normalize operator behavior.
try:
json.loads(body)
self.body_v2 = body
except (json.JSONDecodeError, TypeError):
self.body_v2 = json.dumps({"message": body})
# Set body_promoted - must be valid JSON
# Tests will explicitly pass promoted column's content, but we validate it
if body_promoted is not None:
# Validate that it's valid JSON
try:
json.loads(body_promoted)
self.body_promoted = body_promoted
except (json.JSONDecodeError, TypeError):
# If invalid, default to empty JSON object
self.body_promoted = "{}"
else:
# Default to empty JSON object (valid JSON)
self.body_promoted = "{}"
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = (
@@ -326,6 +355,8 @@ class Logs(ABC):
self.severity_text,
self.severity_number,
self.body,
self.body_v2,
self.body_promoted,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
@@ -454,31 +485,53 @@ def insert_logs(
data=[resource_key.np_arr() for resource_key in resource_keys],
)
# All columns in insertion order (must match Logs.np_arr() order)
all_column_names = [
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"body_v2",
"body_promoted",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
]
# Check if body_v2 column exists (only present when ENABLE_LOGS_MIGRATIONS_V2 migration has run)
result = clickhouse.conn.query(
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
)
has_json_body = result.result_rows[0][0] > 0
if has_json_body:
column_names = all_column_names
data = [log.np_arr() for log in logs]
else:
json_body_cols = {"body_v2", "body_promoted"}
keep_indices = [
i for i, c in enumerate(all_column_names) if c not in json_body_cols
]
column_names = [all_column_names[i] for i in keep_indices]
data = [log.np_arr()[keep_indices] for log in logs]
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
data=data,
column_names=column_names,
)
yield _insert_logs

View File

@@ -1,3 +1,5 @@
from typing import Optional
import docker
import pytest
from testcontainers.core.container import Network
@@ -8,27 +10,32 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@pytest.fixture(name="migrator", scope="package")
def migrator(
def create_migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
cache_key: str = "migrator",
env_overrides: Optional[dict] = None,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
Factory function for running schema migrations.
Accepts optional env_overrides to customize the migrator environment.
"""
def create() -> None:
version = request.config.getoption("--schema-migrator-version")
client = docker.from_env()
environment = dict(env_overrides) if env_overrides else {}
container = client.containers.run(
image=f"signoz/signoz-schema-migrator:{version}",
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -47,6 +54,7 @@ def migrator(
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -59,7 +67,7 @@ def migrator(
container.remove()
return types.Operation(name="migrator")
return types.Operation(name=cache_key)
def delete(_: types.Operation) -> None:
pass
@@ -70,9 +78,27 @@ def migrator(
return dev.wrap(
request,
pytestconfig,
"migrator",
cache_key,
lambda: types.Operation(name=""),
create,
delete,
restore,
)
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,70 @@
import pytest
from testcontainers.core.container import Network
from fixtures import types
from fixtures.migrator import create_migrator
from fixtures.signoz import create_signoz
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
version = config.getoption("--clickhouse-version")
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
skip = pytest.mark.skip(
reason=f"JSON body QB tests require ClickHouse > {version}"
)
for item in items:
item.add_marker(skip)
@pytest.fixture(name="migrator", scope="package")
def migrator_json(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="migrator-json-body",
env_overrides={
"ENABLE_LOGS_MIGRATIONS_V2": "1",
},
)
@pytest.fixture(name="signoz", scope="package")
def signoz_json_body(
network: Network,
zeus: types.TestContainerDocker,
gateway: types.TestContainerDocker,
sqlstore: types.TestContainerSQL,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.SigNoz:
"""
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
"""
return create_signoz(
network=network,
zeus=zeus,
gateway=gateway,
sqlstore=sqlstore,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="signoz-json-body",
env_overrides={
"BODY_JSON_QUERY_ENABLED": "true",
},
)