mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-24 17:23:19 +00:00
Compare commits
2 Commits
json-qb-in
...
pipelinesv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae55a51d66 | ||
|
|
023e315a22 |
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
|
||||
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
|
||||
return nil
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -48,6 +49,7 @@ type provider struct {
|
||||
authzHandler authz.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
logspipelineHandler logspipeline.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -69,6 +71,7 @@ func NewFactory(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -93,6 +96,7 @@ func NewFactory(
|
||||
authzHandler,
|
||||
zeusHandler,
|
||||
querierHandler,
|
||||
logspipelineHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -119,6 +123,7 @@ func newProvider(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -143,6 +148,7 @@ func newProvider(
|
||||
authzHandler: authzHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
logspipelineHandler: logspipelineHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -223,9 +229,14 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addLogspipelineRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
|
||||
return []handler.OpenAPISecurityScheme{
|
||||
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},
|
||||
|
||||
@@ -18,17 +18,13 @@ type responseerroradditional struct {
|
||||
|
||||
func AsJSON(cause error) *JSON {
|
||||
// See if this is an instance of the base error or not
|
||||
_, c, m, err, u, a := Unwrapb(cause)
|
||||
_, c, m, _, u, a := Unwrapb(cause)
|
||||
|
||||
rea := make([]responseerroradditional, len(a))
|
||||
for k, v := range a {
|
||||
rea[k] = responseerroradditional{v}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
rea = append(rea, responseerroradditional{err.Error()})
|
||||
}
|
||||
|
||||
return &JSON{
|
||||
Code: c.String(),
|
||||
Message: m,
|
||||
|
||||
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module logspipeline.Module
|
||||
}
|
||||
|
||||
func NewHandler(module logspipeline.Module) logspipeline.Handler {
|
||||
return &handler{module: module}
|
||||
}
|
||||
|
||||
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, errv2 := valuer.NewUUID(claims.OrgID)
|
||||
if errv2 != nil {
|
||||
render.Error(w, errv2)
|
||||
return
|
||||
}
|
||||
|
||||
version, err := ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if version != -1 {
|
||||
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
return
|
||||
}
|
||||
|
||||
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
}
|
||||
|
||||
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func ParseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
|
||||
return &module{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
|
||||
latestVersion := -1
|
||||
// get latest agent config
|
||||
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lastestConfig != nil {
|
||||
latestVersion = lastestConfig.Version
|
||||
}
|
||||
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
|
||||
}
|
||||
|
||||
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
|
||||
var stored []pipelinetypes.StoreablePipeline
|
||||
err := m.sqlstore.BunDB().NewSelect().
|
||||
Model(&stored).
|
||||
Join("JOIN agent_config_element e ON p.id = e.element_id").
|
||||
Join("JOIN agent_config_version v ON v.id = e.version_id").
|
||||
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
|
||||
Where("v.version = ?", version).
|
||||
Where("v.org_id = ?", orgID.StringValue()).
|
||||
Order("p.order_id ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
|
||||
if len(stored) == 0 {
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
for i := range stored {
|
||||
pipelines[i].StoreablePipeline = stored[i]
|
||||
if err := pipelines[i].ParseRawConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pipelines[i].ParseFilter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// regenerate the id and set other fields
|
||||
storeable.Identifiable.ID = valuer.GenerateUUID()
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
CreatedBy: claims.Email,
|
||||
}
|
||||
|
||||
_, err = m.sqlstore.BunDB().NewInsert().
|
||||
Model(&storeable).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("error in inserting pipeline data", zap.Error(err))
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
if err := pipeline.IsValid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
UpdatedBy: claims.Email,
|
||||
}
|
||||
|
||||
// get id from storeable pipeline
|
||||
id := storeable.ID.StringValue()
|
||||
|
||||
// depending on the order_id update the rest of the table
|
||||
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// ^ |
|
||||
// |_________|
|
||||
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
|
||||
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// | ^
|
||||
// |_____|
|
||||
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
|
||||
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
db := m.sqlstore.BunDBCtx(ctx)
|
||||
|
||||
var existing pipelinetypes.StoreablePipeline
|
||||
if err := db.NewSelect().
|
||||
Model(&existing).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Scan(ctx); err != nil {
|
||||
return m.sqlstore.WrapNotFoundErrf(
|
||||
err,
|
||||
errors.CodeNotFound,
|
||||
"pipeline with id %s does not exist in org %s",
|
||||
id,
|
||||
orgID.StringValue(),
|
||||
)
|
||||
}
|
||||
|
||||
oldOrderID := existing.OrderID
|
||||
newOrderID := storeable.OrderID
|
||||
|
||||
// Reorder other pipelines if the order has changed.
|
||||
if newOrderID != oldOrderID {
|
||||
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve primary key and immutable fields.
|
||||
storeable.ID = existing.ID
|
||||
|
||||
// Persist the updated pipeline (including its new order).
|
||||
if _, err := db.NewUpdate().
|
||||
Model(storeable).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
|
||||
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
|
||||
// The logic is:
|
||||
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
|
||||
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
|
||||
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
|
||||
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
|
||||
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
|
||||
switch {
|
||||
case newOrderID < oldOrderID:
|
||||
// Move up: shift affected pipelines down (order_id + 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id + 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id >= ?", newOrderID).
|
||||
Where("order_id < ?", oldOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
case newOrderID > oldOrderID:
|
||||
// Move down: shift affected pipelines up (order_id - 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id - 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id > ?", oldOrderID).
|
||||
Where("order_id <= ?", newOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
|
||||
return nil
|
||||
}
|
||||
27
pkg/modules/logspipeline/logspipeline.go
Normal file
27
pkg/modules/logspipeline/logspipeline.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package logspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
|
||||
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
|
||||
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
|
||||
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
ListPipelines(w http.ResponseWriter, r *http.Request)
|
||||
GetPipeline(w http.ResponseWriter, r *http.Request)
|
||||
CreatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
UpdatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
DeletePipeline(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
@@ -4048,26 +4049,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
|
||||
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
|
||||
}
|
||||
|
||||
func parseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
req := logparsingpipeline.PipelinesPreviewRequest{}
|
||||
|
||||
@@ -4098,7 +4079,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
version, err := parseAgentConfigVersion(r)
|
||||
version, err := impllogspipeline.ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -66,6 +68,7 @@ type Modules struct {
|
||||
SpanPercentile spanpercentile.Module
|
||||
MetricsExplorer metricsexplorer.Module
|
||||
Promote promote.Module
|
||||
LogsPipeline logspipeline.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -110,5 +113,6 @@ func NewModules(
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
|
||||
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
|
||||
LogsPipeline: impllogspipeline.NewModule(sqlstore),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -59,6 +60,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ authz.Handler }{},
|
||||
struct{ zeus.Handler }{},
|
||||
struct{ querier.Handler }{},
|
||||
struct{ logspipeline.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/global/signozglobal"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||
@@ -254,6 +255,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.AuthzHandler,
|
||||
handlers.ZeusHandler,
|
||||
handlers.QuerierHandler,
|
||||
impllogspipeline.NewHandler(modules.LogsPipeline),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -266,6 +267,37 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
|
||||
rawConfig, err := json.Marshal(p.Config)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
|
||||
}
|
||||
|
||||
filter, err := json.Marshal(p.Filter)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
|
||||
}
|
||||
identifier := valuer.GenerateUUID()
|
||||
if p.ID != "" {
|
||||
identifier, err = valuer.NewUUID(p.ID)
|
||||
if err != nil {
|
||||
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
|
||||
}
|
||||
}
|
||||
return &StoreablePipeline{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: identifier,
|
||||
},
|
||||
OrderID: p.OrderID,
|
||||
Enabled: p.Enabled,
|
||||
Name: p.Name,
|
||||
Alias: p.Alias,
|
||||
Description: p.Description,
|
||||
FilterString: string(filter),
|
||||
ConfigJSON: string(rawConfig),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")
|
||||
|
||||
@@ -1,19 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
if not os.environ.get("DOCKER_CONFIG"):
|
||||
os.environ["DOCKER_CONFIG"] = tempfile.mkdtemp(prefix="docker-config-")
|
||||
os.environ.setdefault("DOCKER_CREDENTIAL_STORE", "")
|
||||
os.environ.setdefault("DOCKER_CREDENTIAL_HELPER", "")
|
||||
|
||||
docker_config_path = os.path.join(os.environ["DOCKER_CONFIG"], "config.json")
|
||||
if not os.path.exists(docker_config_path):
|
||||
with open(docker_config_path, "w", encoding="utf-8") as config_file:
|
||||
json.dump({"auths": {}, "credsStore": ""}, config_file)
|
||||
|
||||
pytest_plugins = [
|
||||
"fixtures.auth",
|
||||
"fixtures.clickhouse",
|
||||
@@ -35,7 +21,6 @@ pytest_plugins = [
|
||||
"fixtures.notification_channel",
|
||||
"fixtures.alerts",
|
||||
"fixtures.cloudintegrations",
|
||||
"fixtures.jsontypeexporter",
|
||||
]
|
||||
|
||||
|
||||
@@ -73,7 +58,7 @@ def pytest_addoption(parser: pytest.Parser):
|
||||
parser.addoption(
|
||||
"--clickhouse-version",
|
||||
action="store",
|
||||
default="25.8.6",
|
||||
default="25.5.6",
|
||||
help="clickhouse version",
|
||||
)
|
||||
parser.addoption(
|
||||
@@ -88,5 +73,3 @@ def pytest_addoption(parser: pytest.Parser):
|
||||
default="v0.129.7",
|
||||
help="schema migrator version",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,408 +0,0 @@
|
||||
"""
|
||||
Simpler version of jsontypeexporter for test fixtures.
|
||||
This exports JSON type metadata to the path_types table by parsing JSON bodies
|
||||
and extracting all paths with their types, similar to how the real jsontypeexporter works.
|
||||
"""
|
||||
import datetime
|
||||
import json
|
||||
from abc import ABC
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Union
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.logs import Logs
|
||||
|
||||
|
||||
class JSONPathType(ABC):
|
||||
"""Represents a JSON path with its type information"""
|
||||
path: str
|
||||
type: str
|
||||
last_seen: np.uint64
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
type: str, # pylint: disable=redefined-builtin
|
||||
last_seen: Optional[datetime.datetime] = None,
|
||||
) -> None:
|
||||
self.path = path
|
||||
self.type = type
|
||||
if last_seen is None:
|
||||
last_seen = datetime.datetime.now()
|
||||
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
"""Return path type data as numpy array for database insertion"""
|
||||
return np.array([self.path, self.type, self.last_seen])
|
||||
|
||||
|
||||
# Constants matching jsontypeexporter
|
||||
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
|
||||
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
|
||||
|
||||
|
||||
def _infer_array_type(elements: List[Any]) -> Optional[str]:
|
||||
"""
|
||||
Infer array type from array elements, matching jsontypeexporter's inferArrayMask logic.
|
||||
Returns None if no valid type can be inferred.
|
||||
"""
|
||||
if len(elements) == 0:
|
||||
return None
|
||||
|
||||
# Collect element types (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
|
||||
types = []
|
||||
for elem in elements:
|
||||
if elem is None:
|
||||
continue
|
||||
if isinstance(elem, dict):
|
||||
types.append("JSON")
|
||||
elif isinstance(elem, str):
|
||||
types.append("String")
|
||||
elif isinstance(elem, bool):
|
||||
types.append("Bool")
|
||||
elif isinstance(elem, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(elem, int):
|
||||
types.append("Int64")
|
||||
|
||||
if len(types) == 0:
|
||||
return None
|
||||
|
||||
# Get unique types (matching Go: unique := make(map[pcommon.ValueType]struct{}, len(types)))
|
||||
unique = set(types)
|
||||
|
||||
# Classify types (matching Go logic)
|
||||
has_json = "JSON" in unique
|
||||
has_primitive = any(t in unique for t in ["String", "Bool", "Float64", "Int64"])
|
||||
|
||||
if has_json:
|
||||
# If only JSON → Array(JSON) (no primitive types)
|
||||
if not has_primitive:
|
||||
return "Array(JSON)"
|
||||
# If there's JSON + any primitive → Dynamic
|
||||
return "Array(Dynamic)"
|
||||
|
||||
# ---- Primitive Type Resolution ----
|
||||
if "String" in unique:
|
||||
# Strings cannot coerce with any other primitive
|
||||
if len(unique) > 1:
|
||||
return "Array(Dynamic)"
|
||||
return "Array(Nullable(String))"
|
||||
|
||||
if "Float64" in unique:
|
||||
return "Array(Nullable(Float64))"
|
||||
if "Int64" in unique:
|
||||
return "Array(Nullable(Int64))"
|
||||
if "Bool" in unique:
|
||||
return "Array(Nullable(Bool))"
|
||||
|
||||
return "Array(Dynamic)"
|
||||
|
||||
|
||||
def _python_type_to_clickhouse_type(value: Any) -> str:
|
||||
"""
|
||||
Convert Python type to ClickHouse JSON type string.
|
||||
Maps Python types to ClickHouse JSON data types.
|
||||
"""
|
||||
if value is None:
|
||||
return "String" # Default for null values
|
||||
|
||||
if isinstance(value, bool):
|
||||
return "Bool"
|
||||
elif isinstance(value, int):
|
||||
return "Int64"
|
||||
elif isinstance(value, float):
|
||||
return "Float64"
|
||||
elif isinstance(value, str):
|
||||
return "String"
|
||||
elif isinstance(value, list):
|
||||
# Use the sophisticated array type inference
|
||||
array_type = _infer_array_type(value)
|
||||
return array_type if array_type else "Array(Dynamic)"
|
||||
elif isinstance(value, dict):
|
||||
return "JSON"
|
||||
else:
|
||||
return "String" # Default fallback
|
||||
|
||||
|
||||
def _extract_json_paths(
|
||||
obj: Any,
|
||||
current_path: str = "",
|
||||
path_types: Optional[Dict[str, Set[str]]] = None,
|
||||
level: int = 0,
|
||||
) -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Recursively extract all paths and their types from a JSON object.
|
||||
Matches jsontypeexporter's analyzePValue logic.
|
||||
|
||||
Args:
|
||||
obj: The JSON object to traverse
|
||||
current_path: Current path being built (e.g., "user.name")
|
||||
path_types: Dictionary mapping paths to sets of types found
|
||||
level: Current nesting level (for depth limiting)
|
||||
|
||||
Returns:
|
||||
Dictionary mapping paths to sets of type strings
|
||||
"""
|
||||
if path_types is None:
|
||||
path_types = {}
|
||||
|
||||
if obj is None:
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add("String") # Null defaults to String
|
||||
return path_types
|
||||
|
||||
if isinstance(obj, dict):
|
||||
# For objects, add the object itself and recurse into keys
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add("JSON")
|
||||
|
||||
for key, value in obj.items():
|
||||
# Build the path for this key
|
||||
if current_path:
|
||||
new_path = f"{current_path}.{key}"
|
||||
else:
|
||||
new_path = key
|
||||
|
||||
# Recurse into the value
|
||||
_extract_json_paths(value, new_path, path_types, level + 1)
|
||||
|
||||
elif isinstance(obj, list):
|
||||
# Skip empty arrays
|
||||
if len(obj) == 0:
|
||||
return path_types
|
||||
|
||||
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
|
||||
types = []
|
||||
|
||||
for item in obj:
|
||||
if isinstance(item, dict):
|
||||
# When traversing into array element objects, use ArraySuffix ([])
|
||||
# This matches: prefix+ArraySuffix in the Go code
|
||||
# Example: if current_path is "education", we use "education[]" to traverse into objects
|
||||
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
|
||||
for key, value in item.items():
|
||||
if array_prefix:
|
||||
# Use array separator: education[].name
|
||||
array_path = f"{array_prefix}.{key}"
|
||||
else:
|
||||
array_path = key
|
||||
# Recurse without increasing level (matching Go behavior)
|
||||
_extract_json_paths(value, array_path, path_types, level)
|
||||
types.append("JSON")
|
||||
elif isinstance(item, list):
|
||||
# Arrays inside arrays are not supported - skip the whole path
|
||||
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
|
||||
return path_types
|
||||
elif isinstance(item, str):
|
||||
types.append("String")
|
||||
elif isinstance(item, bool):
|
||||
types.append("Bool")
|
||||
elif isinstance(item, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(item, int):
|
||||
types.append("Int64")
|
||||
|
||||
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
|
||||
if len(types) > 0:
|
||||
array_type = _infer_array_type(obj)
|
||||
if array_type and current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add(array_type)
|
||||
|
||||
else:
|
||||
# Primitive value (string, number, bool)
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
obj_type = _python_type_to_clickhouse_type(obj)
|
||||
path_types[current_path].add(obj_type)
|
||||
|
||||
return path_types
|
||||
|
||||
|
||||
def _parse_json_bodies_and_extract_paths(
|
||||
json_bodies: List[str],
|
||||
timestamp: Optional[datetime.datetime] = None,
|
||||
) -> List[JSONPathType]:
|
||||
"""
|
||||
Parse JSON bodies and extract all paths with their types.
|
||||
This mimics the behavior of jsontypeexporter.
|
||||
|
||||
Args:
|
||||
json_bodies: List of JSON body strings to parse
|
||||
timestamp: Timestamp to use for last_seen (defaults to now)
|
||||
|
||||
Returns:
|
||||
List of JSONPathType objects with all discovered paths and types
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
|
||||
# Aggregate all paths and their types across all JSON bodies
|
||||
all_path_types: Dict[str, Set[str]] = {}
|
||||
|
||||
for json_body in json_bodies:
|
||||
try:
|
||||
parsed = json.loads(json_body)
|
||||
_extract_json_paths(parsed, "", all_path_types, level=0)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Skip invalid JSON
|
||||
continue
|
||||
|
||||
# Convert to list of JSONPathType objects
|
||||
# Each path can have multiple types, so we create one JSONPathType per type
|
||||
path_type_objects: List[JSONPathType] = []
|
||||
for path, types_set in all_path_types.items():
|
||||
for type_str in types_set:
|
||||
path_type_objects.append(
|
||||
JSONPathType(path=path, type=type_str, last_seen=timestamp)
|
||||
)
|
||||
|
||||
return path_type_objects
|
||||
|
||||
|
||||
@pytest.fixture(name="export_json_types", scope="function")
|
||||
def export_json_types(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest, # To access migrator fixture
|
||||
) -> Generator[Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None]:
|
||||
"""
|
||||
Fixture for exporting JSON type metadata to the path_types table.
|
||||
This is a simpler version of jsontypeexporter for test fixtures.
|
||||
|
||||
The function can accept:
|
||||
1. List of JSONPathType objects (manual specification)
|
||||
2. List of JSON body strings (auto-extract paths)
|
||||
3. List of Logs objects (extract from body_json field)
|
||||
|
||||
Usage examples:
|
||||
# Manual specification
|
||||
export_json_types([
|
||||
JSONPathType(path="user.name", type="String"),
|
||||
JSONPathType(path="user.age", type="Int64"),
|
||||
])
|
||||
|
||||
# Auto-extract from JSON strings
|
||||
export_json_types([
|
||||
'{"user": {"name": "alice", "age": 25}}',
|
||||
'{"user": {"name": "bob", "age": 30}}',
|
||||
])
|
||||
|
||||
# Auto-extract from Logs objects
|
||||
export_json_types(logs_list)
|
||||
"""
|
||||
# Ensure migrator has run to create the table
|
||||
try:
|
||||
request.getfixturevalue("migrator")
|
||||
except Exception:
|
||||
# If migrator fixture is not available, that's okay - table might already exist
|
||||
pass
|
||||
|
||||
def _export_json_types(
|
||||
data: Union[List[JSONPathType], List[str], List[Any]], # List[Logs] but avoiding circular import
|
||||
) -> None:
|
||||
"""
|
||||
Export JSON type metadata to signoz_metadata.distributed_json_path_types table.
|
||||
This table stores path and type information for body JSON fields.
|
||||
"""
|
||||
path_types: List[JSONPathType] = []
|
||||
|
||||
if len(data) == 0:
|
||||
return
|
||||
|
||||
# Determine input type and convert to JSONPathType list
|
||||
first_item = data[0]
|
||||
|
||||
if isinstance(first_item, JSONPathType):
|
||||
# Already JSONPathType objects
|
||||
path_types = data # type: ignore
|
||||
elif isinstance(first_item, str):
|
||||
# List of JSON strings - parse and extract paths
|
||||
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
|
||||
else:
|
||||
# Assume it's a list of Logs objects - extract body_json
|
||||
json_bodies: List[str] = []
|
||||
for log in data: # type: ignore
|
||||
# Try to get body_json attribute
|
||||
if hasattr(log, "body_json") and log.body_json:
|
||||
json_bodies.append(log.body_json)
|
||||
elif hasattr(log, "body") and log.body:
|
||||
# Fallback to body if body_json not available
|
||||
try:
|
||||
# Try to parse as JSON
|
||||
json.loads(log.body)
|
||||
json_bodies.append(log.body)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
if json_bodies:
|
||||
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
|
||||
|
||||
if len(path_types) == 0:
|
||||
return
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_json_path_types",
|
||||
data=[path_type.np_arr() for path_type in path_types],
|
||||
column_names=[
|
||||
"path",
|
||||
"type",
|
||||
"last_seen",
|
||||
],
|
||||
)
|
||||
|
||||
yield _export_json_types
|
||||
|
||||
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.json_path_types ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="export_promoted_paths", scope="function")
|
||||
def export_promoted_paths(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest, # To access migrator fixture
|
||||
) -> Generator[Callable[[List[str]], None], Any, None]:
|
||||
"""
|
||||
Fixture for exporting promoted JSON paths to the promoted paths table.
|
||||
"""
|
||||
# Ensure migrator has run to create the table
|
||||
try:
|
||||
request.getfixturevalue("migrator")
|
||||
except Exception:
|
||||
# If migrator fixture is not available, that's okay - table might already exist
|
||||
pass
|
||||
|
||||
def _export_promoted_paths(paths: List[str]) -> None:
|
||||
if len(paths) == 0:
|
||||
return
|
||||
|
||||
now_ms = int(datetime.datetime.now().timestamp() * 1000)
|
||||
rows = [(path, now_ms) for path in paths]
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_json_promoted_paths",
|
||||
data=rows,
|
||||
column_names=[
|
||||
"path",
|
||||
"created_at",
|
||||
],
|
||||
)
|
||||
|
||||
yield _export_promoted_paths
|
||||
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.json_promoted_paths ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
||||
)
|
||||
@@ -100,8 +100,6 @@ class Logs(ABC):
|
||||
severity_text: str
|
||||
severity_number: np.uint8
|
||||
body: str
|
||||
body_json: str
|
||||
body_json_promoted: str
|
||||
attributes_string: dict[str, str]
|
||||
attributes_number: dict[str, np.float64]
|
||||
attributes_bool: dict[str, bool]
|
||||
@@ -121,8 +119,6 @@ class Logs(ABC):
|
||||
resources: dict[str, Any] = {},
|
||||
attributes: dict[str, Any] = {},
|
||||
body: str = "default body",
|
||||
body_json: Optional[str] = None,
|
||||
body_json_promoted: Optional[str] = None,
|
||||
severity_text: str = "INFO",
|
||||
trace_id: str = "",
|
||||
span_id: str = "",
|
||||
@@ -166,32 +162,6 @@ class Logs(ABC):
|
||||
|
||||
# Set body
|
||||
self.body = body
|
||||
|
||||
# Set body_json - if body is JSON, parse and stringify it, otherwise use empty string
|
||||
# ClickHouse accepts String input for JSON column
|
||||
if body_json is not None:
|
||||
self.body_json = body_json
|
||||
else:
|
||||
# Try to parse body as JSON, if successful use it, otherwise use empty string
|
||||
try:
|
||||
json.loads(body)
|
||||
self.body_json = body
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
self.body_json = ""
|
||||
|
||||
# Set body_json_promoted - must be valid JSON
|
||||
# Tests will explicitly pass promoted column's content, but we validate it
|
||||
if body_json_promoted is not None:
|
||||
# Validate that it's valid JSON
|
||||
try:
|
||||
json.loads(body_json_promoted)
|
||||
self.body_json_promoted = body_json_promoted
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# If invalid, default to empty JSON object
|
||||
self.body_json_promoted = "{}"
|
||||
else:
|
||||
# Default to empty JSON object (valid JSON)
|
||||
self.body_json_promoted = "{}"
|
||||
|
||||
# Process resources and attributes
|
||||
self.resources_string = {k: str(v) for k, v in resources.items()}
|
||||
@@ -349,8 +319,6 @@ class Logs(ABC):
|
||||
self.severity_text,
|
||||
self.severity_number,
|
||||
self.body,
|
||||
self.body_json,
|
||||
self.body_json_promoted,
|
||||
self.attributes_string,
|
||||
self.attributes_number,
|
||||
self.attributes_bool,
|
||||
@@ -495,8 +463,6 @@ def insert_logs(
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"body_json",
|
||||
"body_json_promoted",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
|
||||
@@ -20,14 +20,12 @@ def migrator(
|
||||
"""
|
||||
|
||||
def create() -> None:
|
||||
# Hardcode version for new QB tests
|
||||
version = "v0.129.13-rc.1"
|
||||
version = request.config.getoption("--schema-migrator-version")
|
||||
client = docker.from_env()
|
||||
|
||||
container = client.containers.run(
|
||||
image=f"signoz/signoz-schema-migrator:{version}",
|
||||
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
|
||||
environment={"ENABLE_LOGS_MIGRATIONS_V2": "1"},
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
@@ -46,7 +44,6 @@ def migrator(
|
||||
container = client.containers.run(
|
||||
image=f"signoz/signoz-schema-migrator:{version}",
|
||||
command=f"async --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
|
||||
environment={"ENABLE_LOGS_MIGRATIONS_V2": "1"},
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
|
||||
@@ -73,7 +73,6 @@ def signoz( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
"SIGNOZ_ALERTMANAGER_SIGNOZ_POLL__INTERVAL": "5s",
|
||||
"SIGNOZ_ALERTMANAGER_SIGNOZ_ROUTE_GROUP__WAIT": "1s",
|
||||
"SIGNOZ_ALERTMANAGER_SIGNOZ_ROUTE_GROUP__INTERVAL": "5s",
|
||||
"BODY_JSON_QUERY_ENABLED": "true",
|
||||
}
|
||||
| sqlstore.env
|
||||
| clickhouse.env
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user