mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-24 17:23:19 +00:00
Compare commits
2 Commits
fix/issue-
...
pipelinesv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae55a51d66 | ||
|
|
023e315a22 |
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
|
||||
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
|
||||
return nil
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -48,6 +49,7 @@ type provider struct {
|
||||
authzHandler authz.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
logspipelineHandler logspipeline.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -69,6 +71,7 @@ func NewFactory(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -93,6 +96,7 @@ func NewFactory(
|
||||
authzHandler,
|
||||
zeusHandler,
|
||||
querierHandler,
|
||||
logspipelineHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -119,6 +123,7 @@ func newProvider(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -143,6 +148,7 @@ func newProvider(
|
||||
authzHandler: authzHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
logspipelineHandler: logspipelineHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -223,9 +229,14 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addLogspipelineRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
|
||||
return []handler.OpenAPISecurityScheme{
|
||||
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},
|
||||
|
||||
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module logspipeline.Module
|
||||
}
|
||||
|
||||
func NewHandler(module logspipeline.Module) logspipeline.Handler {
|
||||
return &handler{module: module}
|
||||
}
|
||||
|
||||
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, errv2 := valuer.NewUUID(claims.OrgID)
|
||||
if errv2 != nil {
|
||||
render.Error(w, errv2)
|
||||
return
|
||||
}
|
||||
|
||||
version, err := ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if version != -1 {
|
||||
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
return
|
||||
}
|
||||
|
||||
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
}
|
||||
|
||||
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func ParseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
|
||||
return &module{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
|
||||
latestVersion := -1
|
||||
// get latest agent config
|
||||
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lastestConfig != nil {
|
||||
latestVersion = lastestConfig.Version
|
||||
}
|
||||
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
|
||||
}
|
||||
|
||||
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
|
||||
var stored []pipelinetypes.StoreablePipeline
|
||||
err := m.sqlstore.BunDB().NewSelect().
|
||||
Model(&stored).
|
||||
Join("JOIN agent_config_element e ON p.id = e.element_id").
|
||||
Join("JOIN agent_config_version v ON v.id = e.version_id").
|
||||
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
|
||||
Where("v.version = ?", version).
|
||||
Where("v.org_id = ?", orgID.StringValue()).
|
||||
Order("p.order_id ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
|
||||
if len(stored) == 0 {
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
for i := range stored {
|
||||
pipelines[i].StoreablePipeline = stored[i]
|
||||
if err := pipelines[i].ParseRawConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pipelines[i].ParseFilter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// regenerate the id and set other fields
|
||||
storeable.Identifiable.ID = valuer.GenerateUUID()
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
CreatedBy: claims.Email,
|
||||
}
|
||||
|
||||
_, err = m.sqlstore.BunDB().NewInsert().
|
||||
Model(&storeable).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("error in inserting pipeline data", zap.Error(err))
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
if err := pipeline.IsValid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
UpdatedBy: claims.Email,
|
||||
}
|
||||
|
||||
// get id from storeable pipeline
|
||||
id := storeable.ID.StringValue()
|
||||
|
||||
// depending on the order_id update the rest of the table
|
||||
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// ^ |
|
||||
// |_________|
|
||||
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
|
||||
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// | ^
|
||||
// |_____|
|
||||
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
|
||||
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
db := m.sqlstore.BunDBCtx(ctx)
|
||||
|
||||
var existing pipelinetypes.StoreablePipeline
|
||||
if err := db.NewSelect().
|
||||
Model(&existing).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Scan(ctx); err != nil {
|
||||
return m.sqlstore.WrapNotFoundErrf(
|
||||
err,
|
||||
errors.CodeNotFound,
|
||||
"pipeline with id %s does not exist in org %s",
|
||||
id,
|
||||
orgID.StringValue(),
|
||||
)
|
||||
}
|
||||
|
||||
oldOrderID := existing.OrderID
|
||||
newOrderID := storeable.OrderID
|
||||
|
||||
// Reorder other pipelines if the order has changed.
|
||||
if newOrderID != oldOrderID {
|
||||
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve primary key and immutable fields.
|
||||
storeable.ID = existing.ID
|
||||
|
||||
// Persist the updated pipeline (including its new order).
|
||||
if _, err := db.NewUpdate().
|
||||
Model(storeable).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
|
||||
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
|
||||
// The logic is:
|
||||
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
|
||||
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
|
||||
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
|
||||
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
|
||||
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
|
||||
switch {
|
||||
case newOrderID < oldOrderID:
|
||||
// Move up: shift affected pipelines down (order_id + 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id + 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id >= ?", newOrderID).
|
||||
Where("order_id < ?", oldOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
case newOrderID > oldOrderID:
|
||||
// Move down: shift affected pipelines up (order_id - 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id - 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id > ?", oldOrderID).
|
||||
Where("order_id <= ?", newOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
|
||||
return nil
|
||||
}
|
||||
27
pkg/modules/logspipeline/logspipeline.go
Normal file
27
pkg/modules/logspipeline/logspipeline.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package logspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
|
||||
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
|
||||
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
|
||||
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
ListPipelines(w http.ResponseWriter, r *http.Request)
|
||||
GetPipeline(w http.ResponseWriter, r *http.Request)
|
||||
CreatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
UpdatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
DeletePipeline(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
@@ -4048,26 +4049,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
|
||||
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
|
||||
}
|
||||
|
||||
func parseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
req := logparsingpipeline.PipelinesPreviewRequest{}
|
||||
|
||||
@@ -4098,7 +4079,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
version, err := parseAgentConfigVersion(r)
|
||||
version, err := impllogspipeline.ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -66,6 +68,7 @@ type Modules struct {
|
||||
SpanPercentile spanpercentile.Module
|
||||
MetricsExplorer metricsexplorer.Module
|
||||
Promote promote.Module
|
||||
LogsPipeline logspipeline.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -110,5 +113,6 @@ func NewModules(
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
|
||||
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
|
||||
LogsPipeline: impllogspipeline.NewModule(sqlstore),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -59,6 +60,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ authz.Handler }{},
|
||||
struct{ zeus.Handler }{},
|
||||
struct{ querier.Handler }{},
|
||||
struct{ logspipeline.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/global/signozglobal"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||
@@ -254,6 +255,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.AuthzHandler,
|
||||
handlers.ZeusHandler,
|
||||
handlers.QuerierHandler,
|
||||
impllogspipeline.NewHandler(modules.LogsPipeline),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -266,6 +267,37 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
|
||||
rawConfig, err := json.Marshal(p.Config)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
|
||||
}
|
||||
|
||||
filter, err := json.Marshal(p.Filter)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
|
||||
}
|
||||
identifier := valuer.GenerateUUID()
|
||||
if p.ID != "" {
|
||||
identifier, err = valuer.NewUUID(p.ID)
|
||||
if err != nil {
|
||||
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
|
||||
}
|
||||
}
|
||||
return &StoreablePipeline{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: identifier,
|
||||
},
|
||||
OrderID: p.OrderID,
|
||||
Enabled: p.Enabled,
|
||||
Name: p.Name,
|
||||
Alias: p.Alias,
|
||||
Description: p.Description,
|
||||
FilterString: string(filter),
|
||||
ConfigJSON: string(rawConfig),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")
|
||||
|
||||
Reference in New Issue
Block a user