mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-24 17:23:19 +00:00
Compare commits
2 Commits
nv/7069
...
pipelinesv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae55a51d66 | ||
|
|
023e315a22 |
2
.github/workflows/integrationci.yaml
vendored
2
.github/workflows/integrationci.yaml
vendored
@@ -54,7 +54,7 @@ jobs:
|
||||
- sqlite
|
||||
clickhouse-version:
|
||||
- 25.5.6
|
||||
- 25.12.5
|
||||
- 25.10.5
|
||||
schema-migrator-version:
|
||||
- v0.142.0
|
||||
postgres-version:
|
||||
|
||||
@@ -308,15 +308,3 @@ export const PublicDashboardPage = Loadable(
|
||||
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
|
||||
),
|
||||
);
|
||||
|
||||
export const AlertTypeSelectionPage = Loadable(
|
||||
() =>
|
||||
import(
|
||||
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
|
||||
),
|
||||
);
|
||||
|
||||
export const MeterExplorerPage = Loadable(
|
||||
() =>
|
||||
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
|
||||
);
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import { RouteProps } from 'react-router-dom';
|
||||
import ROUTES from 'constants/routes';
|
||||
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
|
||||
import MessagingQueues from 'pages/MessagingQueues';
|
||||
import MeterExplorer from 'pages/MeterExplorer';
|
||||
|
||||
import {
|
||||
AlertHistory,
|
||||
AlertOverview,
|
||||
AlertTypeSelectionPage,
|
||||
AllAlertChannels,
|
||||
AllErrors,
|
||||
ApiMonitoring,
|
||||
@@ -27,8 +29,6 @@ import {
|
||||
LogsExplorer,
|
||||
LogsIndexToFields,
|
||||
LogsSaveViews,
|
||||
MessagingQueuesMainPage,
|
||||
MeterExplorerPage,
|
||||
MetricsExplorer,
|
||||
OldLogsExplorer,
|
||||
Onboarding,
|
||||
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_CELERY_TASK',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_OVERVIEW',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
|
||||
isPrivate: true,
|
||||
},
|
||||
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.METER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER_VIEWS,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER_VIEWS',
|
||||
isPrivate: true,
|
||||
},
|
||||
|
||||
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
|
||||
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
|
||||
return nil
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -48,6 +49,7 @@ type provider struct {
|
||||
authzHandler authz.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
logspipelineHandler logspipeline.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -69,6 +71,7 @@ func NewFactory(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -93,6 +96,7 @@ func NewFactory(
|
||||
authzHandler,
|
||||
zeusHandler,
|
||||
querierHandler,
|
||||
logspipelineHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -119,6 +123,7 @@ func newProvider(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -143,6 +148,7 @@ func newProvider(
|
||||
authzHandler: authzHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
logspipelineHandler: logspipelineHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -223,9 +229,14 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addLogspipelineRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
|
||||
return []handler.OpenAPISecurityScheme{
|
||||
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},
|
||||
|
||||
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module logspipeline.Module
|
||||
}
|
||||
|
||||
func NewHandler(module logspipeline.Module) logspipeline.Handler {
|
||||
return &handler{module: module}
|
||||
}
|
||||
|
||||
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, errv2 := valuer.NewUUID(claims.OrgID)
|
||||
if errv2 != nil {
|
||||
render.Error(w, errv2)
|
||||
return
|
||||
}
|
||||
|
||||
version, err := ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if version != -1 {
|
||||
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
return
|
||||
}
|
||||
|
||||
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
}
|
||||
|
||||
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func ParseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
|
||||
return &module{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
|
||||
latestVersion := -1
|
||||
// get latest agent config
|
||||
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lastestConfig != nil {
|
||||
latestVersion = lastestConfig.Version
|
||||
}
|
||||
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
|
||||
}
|
||||
|
||||
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
|
||||
var stored []pipelinetypes.StoreablePipeline
|
||||
err := m.sqlstore.BunDB().NewSelect().
|
||||
Model(&stored).
|
||||
Join("JOIN agent_config_element e ON p.id = e.element_id").
|
||||
Join("JOIN agent_config_version v ON v.id = e.version_id").
|
||||
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
|
||||
Where("v.version = ?", version).
|
||||
Where("v.org_id = ?", orgID.StringValue()).
|
||||
Order("p.order_id ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
|
||||
if len(stored) == 0 {
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
for i := range stored {
|
||||
pipelines[i].StoreablePipeline = stored[i]
|
||||
if err := pipelines[i].ParseRawConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pipelines[i].ParseFilter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// regenerate the id and set other fields
|
||||
storeable.Identifiable.ID = valuer.GenerateUUID()
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
CreatedBy: claims.Email,
|
||||
}
|
||||
|
||||
_, err = m.sqlstore.BunDB().NewInsert().
|
||||
Model(&storeable).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("error in inserting pipeline data", zap.Error(err))
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
if err := pipeline.IsValid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
UpdatedBy: claims.Email,
|
||||
}
|
||||
|
||||
// get id from storeable pipeline
|
||||
id := storeable.ID.StringValue()
|
||||
|
||||
// depending on the order_id update the rest of the table
|
||||
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// ^ |
|
||||
// |_________|
|
||||
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
|
||||
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// | ^
|
||||
// |_____|
|
||||
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
|
||||
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
db := m.sqlstore.BunDBCtx(ctx)
|
||||
|
||||
var existing pipelinetypes.StoreablePipeline
|
||||
if err := db.NewSelect().
|
||||
Model(&existing).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Scan(ctx); err != nil {
|
||||
return m.sqlstore.WrapNotFoundErrf(
|
||||
err,
|
||||
errors.CodeNotFound,
|
||||
"pipeline with id %s does not exist in org %s",
|
||||
id,
|
||||
orgID.StringValue(),
|
||||
)
|
||||
}
|
||||
|
||||
oldOrderID := existing.OrderID
|
||||
newOrderID := storeable.OrderID
|
||||
|
||||
// Reorder other pipelines if the order has changed.
|
||||
if newOrderID != oldOrderID {
|
||||
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve primary key and immutable fields.
|
||||
storeable.ID = existing.ID
|
||||
|
||||
// Persist the updated pipeline (including its new order).
|
||||
if _, err := db.NewUpdate().
|
||||
Model(storeable).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
|
||||
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
|
||||
// The logic is:
|
||||
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
|
||||
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
|
||||
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
|
||||
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
|
||||
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
|
||||
switch {
|
||||
case newOrderID < oldOrderID:
|
||||
// Move up: shift affected pipelines down (order_id + 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id + 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id >= ?", newOrderID).
|
||||
Where("order_id < ?", oldOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
case newOrderID > oldOrderID:
|
||||
// Move down: shift affected pipelines up (order_id - 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id - 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id > ?", oldOrderID).
|
||||
Where("order_id <= ?", newOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
|
||||
return nil
|
||||
}
|
||||
27
pkg/modules/logspipeline/logspipeline.go
Normal file
27
pkg/modules/logspipeline/logspipeline.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package logspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
|
||||
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
|
||||
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
|
||||
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
ListPipelines(w http.ResponseWriter, r *http.Request)
|
||||
GetPipeline(w http.ResponseWriter, r *http.Request)
|
||||
CreatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
UpdatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
DeletePipeline(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
@@ -120,8 +120,6 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
|
||||
}
|
||||
}
|
||||
resultData.Rows = filteredRows
|
||||
case *qbtypes.ScalarData:
|
||||
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
|
||||
}
|
||||
|
||||
filteredData = append(filteredData, result)
|
||||
@@ -147,39 +145,6 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
|
||||
// Find column indices for server address fields
|
||||
serverColIndices := make([]int, 0)
|
||||
for i, col := range columns {
|
||||
if col.Name == derivedKeyHTTPHost {
|
||||
serverColIndices = append(serverColIndices, i)
|
||||
}
|
||||
}
|
||||
|
||||
if len(serverColIndices) == 0 {
|
||||
return data
|
||||
}
|
||||
|
||||
filtered := make([][]any, 0, len(data))
|
||||
for _, row := range data {
|
||||
includeRow := true
|
||||
for _, colIdx := range serverColIndices {
|
||||
if colIdx < len(row) {
|
||||
if strVal, ok := row[colIdx].(string); ok {
|
||||
if net.ParseIP(strVal) != nil {
|
||||
includeRow = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if includeRow {
|
||||
filtered = append(filtered, row)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func shouldIncludeRow(row *qbtypes.RawRow) bool {
|
||||
if row.Data != nil {
|
||||
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {
|
||||
|
||||
@@ -117,59 +117,6 @@ func TestFilterResponse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "should filter out IP addresses from scalar data",
|
||||
input: []*qbtypes.QueryRangeResponse{
|
||||
{
|
||||
Data: qbtypes.QueryData{
|
||||
Results: []any{
|
||||
&qbtypes.ScalarData{
|
||||
QueryName: "endpoints",
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
},
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
|
||||
Type: qbtypes.ColumnTypeAggregation,
|
||||
},
|
||||
},
|
||||
Data: [][]any{
|
||||
{"192.168.1.1", 10},
|
||||
{"example.com", 20},
|
||||
{"10.0.0.1", 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []*qbtypes.QueryRangeResponse{
|
||||
{
|
||||
Data: qbtypes.QueryData{
|
||||
Results: []any{
|
||||
&qbtypes.ScalarData{
|
||||
QueryName: "endpoints",
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
},
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
|
||||
Type: qbtypes.ColumnTypeAggregation,
|
||||
},
|
||||
},
|
||||
Data: [][]any{
|
||||
{"example.com", 20},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
@@ -4048,26 +4049,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
|
||||
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
|
||||
}
|
||||
|
||||
func parseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
req := logparsingpipeline.PipelinesPreviewRequest{}
|
||||
|
||||
@@ -4098,7 +4079,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
version, err := parseAgentConfigVersion(r)
|
||||
version, err := impllogspipeline.ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -66,6 +68,7 @@ type Modules struct {
|
||||
SpanPercentile spanpercentile.Module
|
||||
MetricsExplorer metricsexplorer.Module
|
||||
Promote promote.Module
|
||||
LogsPipeline logspipeline.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -110,5 +113,6 @@ func NewModules(
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
|
||||
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
|
||||
LogsPipeline: impllogspipeline.NewModule(sqlstore),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -59,6 +60,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ authz.Handler }{},
|
||||
struct{ zeus.Handler }{},
|
||||
struct{ querier.Handler }{},
|
||||
struct{ logspipeline.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/global/signozglobal"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||
@@ -254,6 +255,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.AuthzHandler,
|
||||
handlers.ZeusHandler,
|
||||
handlers.QuerierHandler,
|
||||
impllogspipeline.NewHandler(modules.LogsPipeline),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -26,9 +26,6 @@ const (
|
||||
IncreaseMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s, (%s - lagInFrame(%s, 1) OVER rate_window))) AS per_series_value`
|
||||
|
||||
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
|
||||
|
||||
RateWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values / (ts - earliest_start_ts), earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, (sum_all_values - lagInFrame(latest_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window), sum_all_values / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value"
|
||||
IncreaseWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values, earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, sum_all_values - lagInFrame(latest_value, 1) OVER rate_window, sum_all_values) AS per_series_value"
|
||||
)
|
||||
|
||||
type MetricQueryStatementBuilder struct {
|
||||
@@ -333,9 +330,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
|
||||
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
||||
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
|
||||
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationIncrease || query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
||||
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTs(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
@@ -388,115 +382,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTs(
|
||||
ctx context.Context,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if tbl == SamplesV4TableName {
|
||||
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(
|
||||
_ context.Context,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
|
||||
moreInfoQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
moreInfoQueryBuilder.Select("fingerprint")
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
))
|
||||
moreInfoQueryBuilder.SelectMore("toDateTime(intDiv(start_timestamp_unix_milli, 1000)) AS start_ts")
|
||||
for _, g := range query.GroupBy {
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
|
||||
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS max_value", aggCol))
|
||||
|
||||
colWithLatestValue, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, metrictypes.TimeAggregationLatest, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS latest_value", colWithLatestValue))
|
||||
moreInfoQueryBuilder.SelectMore("max(unix_milli) AS latest_timestamp")
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
moreInfoQueryBuilder.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
moreInfoQueryBuilder.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
moreInfoQueryBuilder.Where(
|
||||
moreInfoQueryBuilder.In("metric_name", query.Aggregations[0].MetricName),
|
||||
moreInfoQueryBuilder.GTE("unix_milli", start),
|
||||
moreInfoQueryBuilder.LT("unix_milli", end),
|
||||
)
|
||||
moreInfoQueryBuilder.GroupBy("fingerprint", "ts", "start_ts")
|
||||
moreInfoQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
|
||||
moreInfoPerRowQuery, moreInfoPerRowArgs := moreInfoQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||
|
||||
innerQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
innerQueryBuilder.Select("fingerprint")
|
||||
innerQueryBuilder.SelectMore("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
innerQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
innerQueryBuilder.SelectMore("max(start_ts) AS latest_start_ts")
|
||||
innerQueryBuilder.SelectMore("min(start_ts) AS earliest_start_ts")
|
||||
innerQueryBuilder.SelectMore("sum(max_value) AS sum_all_values")
|
||||
innerQueryBuilder.SelectMore("argMax(latest_value, latest_timestamp) AS latest_value")
|
||||
|
||||
innerQueryBuilder.From(fmt.Sprintf("(%s)", moreInfoPerRowQuery))
|
||||
|
||||
innerQueryBuilder.GroupBy("fingerprint", "ts")
|
||||
innerQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
|
||||
innerQuery, innerQueryArgs := innerQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
|
||||
switch query.Aggregations[0].TimeAggregation {
|
||||
case metrictypes.TimeAggregationRate:
|
||||
rateExpr := fmt.Sprintf(RateWithStartTs, start)
|
||||
wrapped := sqlbuilder.NewSelectBuilder()
|
||||
wrapped.Select("ts")
|
||||
wrapped.SelectMore("latest_value")
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
wrapped.SelectMore(rateExpr)
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
|
||||
case metrictypes.TimeAggregationIncrease:
|
||||
incExpr := fmt.Sprintf(IncreaseWithStartTs, start)
|
||||
wrapped := sqlbuilder.NewSelectBuilder()
|
||||
wrapped.Select("ts")
|
||||
wrapped.SelectMore("latest_value")
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
wrapped.SelectMore(incExpr)
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
default:
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerQueryArgs, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
_ context.Context,
|
||||
start, end uint64,
|
||||
@@ -504,7 +389,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
|
||||
baseSb := sqlbuilder.NewSelectBuilder()
|
||||
@@ -516,9 +400,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
baseSb.SelectMore("uniqMerge(num_start_timestamps) AS num_start_timestamps")
|
||||
}
|
||||
|
||||
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
@@ -526,6 +407,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
}
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
baseSb.Where(
|
||||
@@ -546,9 +428,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
|
||||
}
|
||||
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", RateTmpl))
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
|
||||
@@ -560,9 +439,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
|
||||
}
|
||||
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", IncreaseTmpl))
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
|
||||
@@ -639,12 +515,11 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
|
||||
_ context.Context,
|
||||
start uint64,
|
||||
end uint64,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if query.Aggregations[0].SpaceAggregation.IsZero() {
|
||||
return "", nil, errors.Newf(
|
||||
errors.TypeInvalidInput,
|
||||
@@ -659,9 +534,6 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
|
||||
if tbl != SamplesV4TableName {
|
||||
sb.SelectMore("max(has_restarts) AS has_restarts")
|
||||
}
|
||||
sb.From("__temporal_aggregation_cte")
|
||||
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
|
||||
if query.Aggregations[0].ValueFilter != nil {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -266,6 +267,37 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
|
||||
rawConfig, err := json.Marshal(p.Config)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
|
||||
}
|
||||
|
||||
filter, err := json.Marshal(p.Filter)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
|
||||
}
|
||||
identifier := valuer.GenerateUUID()
|
||||
if p.ID != "" {
|
||||
identifier, err = valuer.NewUUID(p.ID)
|
||||
if err != nil {
|
||||
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
|
||||
}
|
||||
}
|
||||
return &StoreablePipeline{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: identifier,
|
||||
},
|
||||
OrderID: p.OrderID,
|
||||
Enabled: p.Enabled,
|
||||
Name: p.Name,
|
||||
Alias: p.Alias,
|
||||
Description: p.Description,
|
||||
FilterString: string(filter),
|
||||
ConfigJSON: string(rawConfig),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")
|
||||
|
||||
Reference in New Issue
Block a user