mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-25 01:33:20 +00:00
Compare commits
2 Commits
fix/remove
...
pipelinesv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae55a51d66 | ||
|
|
023e315a22 |
2
.github/workflows/integrationci.yaml
vendored
2
.github/workflows/integrationci.yaml
vendored
@@ -54,7 +54,7 @@ jobs:
|
||||
- sqlite
|
||||
clickhouse-version:
|
||||
- 25.5.6
|
||||
- 25.12.5
|
||||
- 25.10.5
|
||||
schema-migrator-version:
|
||||
- v0.142.0
|
||||
postgres-version:
|
||||
|
||||
@@ -308,15 +308,3 @@ export const PublicDashboardPage = Loadable(
|
||||
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
|
||||
),
|
||||
);
|
||||
|
||||
export const AlertTypeSelectionPage = Loadable(
|
||||
() =>
|
||||
import(
|
||||
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
|
||||
),
|
||||
);
|
||||
|
||||
export const MeterExplorerPage = Loadable(
|
||||
() =>
|
||||
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
|
||||
);
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import { RouteProps } from 'react-router-dom';
|
||||
import ROUTES from 'constants/routes';
|
||||
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
|
||||
import MessagingQueues from 'pages/MessagingQueues';
|
||||
import MeterExplorer from 'pages/MeterExplorer';
|
||||
|
||||
import {
|
||||
AlertHistory,
|
||||
AlertOverview,
|
||||
AlertTypeSelectionPage,
|
||||
AllAlertChannels,
|
||||
AllErrors,
|
||||
ApiMonitoring,
|
||||
@@ -27,8 +29,6 @@ import {
|
||||
LogsExplorer,
|
||||
LogsIndexToFields,
|
||||
LogsSaveViews,
|
||||
MessagingQueuesMainPage,
|
||||
MeterExplorerPage,
|
||||
MetricsExplorer,
|
||||
OldLogsExplorer,
|
||||
Onboarding,
|
||||
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_CELERY_TASK',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_OVERVIEW',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
|
||||
isPrivate: true,
|
||||
},
|
||||
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.METER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER_VIEWS,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER_VIEWS',
|
||||
isPrivate: true,
|
||||
},
|
||||
|
||||
@@ -27,7 +27,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -103,10 +103,9 @@ function K8sClustersList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -105,10 +105,9 @@ function K8sDaemonSetsList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -106,10 +106,9 @@ function K8sDeploymentsList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -101,10 +101,9 @@ function K8sJobsList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -10,7 +10,6 @@ import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteRe
|
||||
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { DataSource } from 'types/common/queryBuilder';
|
||||
|
||||
import { safeParseJSON } from './commonUtils';
|
||||
import { INFRA_MONITORING_K8S_PARAMS_KEYS, K8sCategory } from './constants';
|
||||
import K8sFiltersSidePanel from './K8sFiltersSidePanel/K8sFiltersSidePanel';
|
||||
import { IEntityColumn } from './utils';
|
||||
@@ -59,10 +58,9 @@ function K8sHeader({
|
||||
const urlFilters = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.FILTERS);
|
||||
let { filters } = currentQuery.builder.queryData[0];
|
||||
if (urlFilters) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['filters']>(urlFilters);
|
||||
if (parsed) {
|
||||
filters = parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(urlFilters);
|
||||
const parsed = JSON.parse(decoded);
|
||||
filters = parsed;
|
||||
}
|
||||
return {
|
||||
...currentQuery,
|
||||
|
||||
@@ -27,7 +27,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -104,10 +104,9 @@ function K8sNamespacesList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -27,7 +27,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -99,10 +99,9 @@ function K8sNodesList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -29,7 +29,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -92,10 +92,9 @@ function K8sPodsList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -105,10 +105,9 @@ function K8sStatefulSetsList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { FeatureKeys } from '../../../constants/features';
|
||||
import { useAppContext } from '../../../providers/App/App';
|
||||
import { getOrderByFromParams, safeParseJSON } from '../commonUtils';
|
||||
import { getOrderByFromParams } from '../commonUtils';
|
||||
import {
|
||||
GetK8sEntityToAggregateAttribute,
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS,
|
||||
@@ -105,10 +105,9 @@ function K8sVolumesList({
|
||||
const [groupBy, setGroupBy] = useState<IBuilderQuery['groupBy']>(() => {
|
||||
const groupBy = searchParams.get(INFRA_MONITORING_K8S_PARAMS_KEYS.GROUP_BY);
|
||||
if (groupBy) {
|
||||
const parsed = safeParseJSON<IBuilderQuery['groupBy']>(groupBy);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(groupBy);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['groupBy'];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
/* eslint-disable prefer-destructuring */
|
||||
|
||||
import { useMemo } from 'react';
|
||||
import * as Sentry from '@sentry/react';
|
||||
import { Color } from '@signozhq/design-tokens';
|
||||
import { Table, Tooltip, Typography } from 'antd';
|
||||
import { Progress } from 'antd/lib';
|
||||
@@ -261,19 +260,6 @@ export const filterDuplicateFilters = (
|
||||
return uniqueFilters;
|
||||
};
|
||||
|
||||
export const safeParseJSON = <T,>(value: string): T | null => {
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return JSON.parse(value) as T;
|
||||
} catch (e) {
|
||||
console.error('Error parsing JSON from URL parameter:', e);
|
||||
// TODO: Should we capture this error in Sentry?
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
export const getOrderByFromParams = (
|
||||
searchParams: URLSearchParams,
|
||||
returnNullAsDefault = false,
|
||||
@@ -285,12 +271,9 @@ export const getOrderByFromParams = (
|
||||
INFRA_MONITORING_K8S_PARAMS_KEYS.ORDER_BY,
|
||||
);
|
||||
if (orderByFromParams) {
|
||||
const parsed = safeParseJSON<{ columnName: string; order: 'asc' | 'desc' }>(
|
||||
orderByFromParams,
|
||||
);
|
||||
if (parsed) {
|
||||
return parsed;
|
||||
}
|
||||
const decoded = decodeURIComponent(orderByFromParams);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as { columnName: string; order: 'asc' | 'desc' };
|
||||
}
|
||||
if (returnNullAsDefault) {
|
||||
return null;
|
||||
@@ -304,7 +287,13 @@ export const getFiltersFromParams = (
|
||||
): IBuilderQuery['filters'] | null => {
|
||||
const filtersFromParams = searchParams.get(queryKey);
|
||||
if (filtersFromParams) {
|
||||
return safeParseJSON<IBuilderQuery['filters']>(filtersFromParams);
|
||||
try {
|
||||
const decoded = decodeURIComponent(filtersFromParams);
|
||||
const parsed = JSON.parse(decoded);
|
||||
return parsed as IBuilderQuery['filters'];
|
||||
} catch (error) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
12
pkg/apiserver/signozapiserver/logspipeline.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
|
||||
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
|
||||
return nil
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -48,6 +49,7 @@ type provider struct {
|
||||
authzHandler authz.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
logspipelineHandler logspipeline.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -69,6 +71,7 @@ func NewFactory(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -93,6 +96,7 @@ func NewFactory(
|
||||
authzHandler,
|
||||
zeusHandler,
|
||||
querierHandler,
|
||||
logspipelineHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -119,6 +123,7 @@ func newProvider(
|
||||
authzHandler authz.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
logspipelineHandler logspipeline.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -143,6 +148,7 @@ func newProvider(
|
||||
authzHandler: authzHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
logspipelineHandler: logspipelineHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -223,9 +229,14 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addLogspipelineRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
|
||||
return []handler.OpenAPISecurityScheme{
|
||||
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},
|
||||
|
||||
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
91
pkg/modules/logspipeline/impllogspipeline/handler.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module logspipeline.Module
|
||||
}
|
||||
|
||||
func NewHandler(module logspipeline.Module) logspipeline.Handler {
|
||||
return &handler{module: module}
|
||||
}
|
||||
|
||||
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, errv2 := valuer.NewUUID(claims.OrgID)
|
||||
if errv2 != nil {
|
||||
render.Error(w, errv2)
|
||||
return
|
||||
}
|
||||
|
||||
version, err := ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if version != -1 {
|
||||
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
return
|
||||
}
|
||||
|
||||
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(w, http.StatusOK, pipelines)
|
||||
}
|
||||
|
||||
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func ParseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
230
pkg/modules/logspipeline/impllogspipeline/module.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package impllogspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
|
||||
return &module{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
|
||||
latestVersion := -1
|
||||
// get latest agent config
|
||||
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lastestConfig != nil {
|
||||
latestVersion = lastestConfig.Version
|
||||
}
|
||||
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
|
||||
}
|
||||
|
||||
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
|
||||
var stored []pipelinetypes.StoreablePipeline
|
||||
err := m.sqlstore.BunDB().NewSelect().
|
||||
Model(&stored).
|
||||
Join("JOIN agent_config_element e ON p.id = e.element_id").
|
||||
Join("JOIN agent_config_version v ON v.id = e.version_id").
|
||||
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
|
||||
Where("v.version = ?", version).
|
||||
Where("v.org_id = ?", orgID.StringValue()).
|
||||
Order("p.order_id ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
|
||||
if len(stored) == 0 {
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
for i := range stored {
|
||||
pipelines[i].StoreablePipeline = stored[i]
|
||||
if err := pipelines[i].ParseRawConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pipelines[i].ParseFilter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// regenerate the id and set other fields
|
||||
storeable.Identifiable.ID = valuer.GenerateUUID()
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
CreatedBy: claims.Email,
|
||||
}
|
||||
|
||||
_, err = m.sqlstore.BunDB().NewInsert().
|
||||
Model(&storeable).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("error in inserting pipeline data", zap.Error(err))
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
|
||||
if err := pipeline.IsValid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable, err := pipeline.ToStoreablePipeline()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeable.OrgID = orgID.String()
|
||||
storeable.TimeAuditable = types.TimeAuditable{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
storeable.UserAuditable = types.UserAuditable{
|
||||
UpdatedBy: claims.Email,
|
||||
}
|
||||
|
||||
// get id from storeable pipeline
|
||||
id := storeable.ID.StringValue()
|
||||
|
||||
// depending on the order_id update the rest of the table
|
||||
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// ^ |
|
||||
// |_________|
|
||||
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
|
||||
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
|
||||
// old: 1, 2, 3, 4, 5, 6
|
||||
// | ^
|
||||
// |_____|
|
||||
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
|
||||
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
db := m.sqlstore.BunDBCtx(ctx)
|
||||
|
||||
var existing pipelinetypes.StoreablePipeline
|
||||
if err := db.NewSelect().
|
||||
Model(&existing).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Scan(ctx); err != nil {
|
||||
return m.sqlstore.WrapNotFoundErrf(
|
||||
err,
|
||||
errors.CodeNotFound,
|
||||
"pipeline with id %s does not exist in org %s",
|
||||
id,
|
||||
orgID.StringValue(),
|
||||
)
|
||||
}
|
||||
|
||||
oldOrderID := existing.OrderID
|
||||
newOrderID := storeable.OrderID
|
||||
|
||||
// Reorder other pipelines if the order has changed.
|
||||
if newOrderID != oldOrderID {
|
||||
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve primary key and immutable fields.
|
||||
storeable.ID = existing.ID
|
||||
|
||||
// Persist the updated pipeline (including its new order).
|
||||
if _, err := db.NewUpdate().
|
||||
Model(storeable).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: *storeable,
|
||||
Filter: pipeline.Filter,
|
||||
Config: pipeline.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
|
||||
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
|
||||
// The logic is:
|
||||
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
|
||||
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
|
||||
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
|
||||
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
|
||||
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
|
||||
switch {
|
||||
case newOrderID < oldOrderID:
|
||||
// Move up: shift affected pipelines down (order_id + 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id + 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id >= ?", newOrderID).
|
||||
Where("order_id < ?", oldOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
case newOrderID > oldOrderID:
|
||||
// Move down: shift affected pipelines up (order_id - 1).
|
||||
_, err := tx.NewUpdate().
|
||||
Model((*pipelinetypes.StoreablePipeline)(nil)).
|
||||
Set("order_id = order_id - 1").
|
||||
Where("org_id = ?", orgID).
|
||||
Where("order_id > ?", oldOrderID).
|
||||
Where("order_id <= ?", newOrderID).
|
||||
Exec(ctx)
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
|
||||
return nil
|
||||
}
|
||||
27
pkg/modules/logspipeline/logspipeline.go
Normal file
27
pkg/modules/logspipeline/logspipeline.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package logspipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
|
||||
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
|
||||
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
|
||||
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
|
||||
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
ListPipelines(w http.ResponseWriter, r *http.Request)
|
||||
GetPipeline(w http.ResponseWriter, r *http.Request)
|
||||
CreatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
UpdatePipeline(w http.ResponseWriter, r *http.Request)
|
||||
DeletePipeline(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
@@ -120,8 +120,6 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
|
||||
}
|
||||
}
|
||||
resultData.Rows = filteredRows
|
||||
case *qbtypes.ScalarData:
|
||||
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
|
||||
}
|
||||
|
||||
filteredData = append(filteredData, result)
|
||||
@@ -147,39 +145,6 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
|
||||
// Find column indices for server address fields
|
||||
serverColIndices := make([]int, 0)
|
||||
for i, col := range columns {
|
||||
if col.Name == derivedKeyHTTPHost {
|
||||
serverColIndices = append(serverColIndices, i)
|
||||
}
|
||||
}
|
||||
|
||||
if len(serverColIndices) == 0 {
|
||||
return data
|
||||
}
|
||||
|
||||
filtered := make([][]any, 0, len(data))
|
||||
for _, row := range data {
|
||||
includeRow := true
|
||||
for _, colIdx := range serverColIndices {
|
||||
if colIdx < len(row) {
|
||||
if strVal, ok := row[colIdx].(string); ok {
|
||||
if net.ParseIP(strVal) != nil {
|
||||
includeRow = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if includeRow {
|
||||
filtered = append(filtered, row)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func shouldIncludeRow(row *qbtypes.RawRow) bool {
|
||||
if row.Data != nil {
|
||||
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {
|
||||
|
||||
@@ -117,59 +117,6 @@ func TestFilterResponse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "should filter out IP addresses from scalar data",
|
||||
input: []*qbtypes.QueryRangeResponse{
|
||||
{
|
||||
Data: qbtypes.QueryData{
|
||||
Results: []any{
|
||||
&qbtypes.ScalarData{
|
||||
QueryName: "endpoints",
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
},
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
|
||||
Type: qbtypes.ColumnTypeAggregation,
|
||||
},
|
||||
},
|
||||
Data: [][]any{
|
||||
{"192.168.1.1", 10},
|
||||
{"example.com", 20},
|
||||
{"10.0.0.1", 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []*qbtypes.QueryRangeResponse{
|
||||
{
|
||||
Data: qbtypes.QueryData{
|
||||
Results: []any{
|
||||
&qbtypes.ScalarData{
|
||||
QueryName: "endpoints",
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
},
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
|
||||
Type: qbtypes.ColumnTypeAggregation,
|
||||
},
|
||||
},
|
||||
Data: [][]any{
|
||||
{"example.com", 20},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
@@ -4048,26 +4049,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
|
||||
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
|
||||
}
|
||||
|
||||
func parseAgentConfigVersion(r *http.Request) (int, error) {
|
||||
versionString := mux.Vars(r)["version"]
|
||||
|
||||
if versionString == "latest" {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
version64, err := strconv.ParseInt(versionString, 0, 8)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
if version64 <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
|
||||
}
|
||||
|
||||
return int(version64), nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
req := logparsingpipeline.PipelinesPreviewRequest{}
|
||||
|
||||
@@ -4098,7 +4079,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
version, err := parseAgentConfigVersion(r)
|
||||
version, err := impllogspipeline.ParseAgentConfigVersion(r)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -66,6 +68,7 @@ type Modules struct {
|
||||
SpanPercentile spanpercentile.Module
|
||||
MetricsExplorer metricsexplorer.Module
|
||||
Promote promote.Module
|
||||
LogsPipeline logspipeline.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -110,5 +113,6 @@ func NewModules(
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
|
||||
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
|
||||
LogsPipeline: impllogspipeline.NewModule(sqlstore),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -59,6 +60,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ authz.Handler }{},
|
||||
struct{ zeus.Handler }{},
|
||||
struct{ querier.Handler }{},
|
||||
struct{ logspipeline.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/global/signozglobal"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||
@@ -254,6 +255,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.AuthzHandler,
|
||||
handlers.ZeusHandler,
|
||||
handlers.QuerierHandler,
|
||||
impllogspipeline.NewHandler(modules.LogsPipeline),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -266,6 +267,37 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
|
||||
rawConfig, err := json.Marshal(p.Config)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
|
||||
}
|
||||
|
||||
filter, err := json.Marshal(p.Filter)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
|
||||
}
|
||||
identifier := valuer.GenerateUUID()
|
||||
if p.ID != "" {
|
||||
identifier, err = valuer.NewUUID(p.ID)
|
||||
if err != nil {
|
||||
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
|
||||
}
|
||||
}
|
||||
return &StoreablePipeline{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: identifier,
|
||||
},
|
||||
OrderID: p.OrderID,
|
||||
Enabled: p.Enabled,
|
||||
Name: p.Name,
|
||||
Alias: p.Alias,
|
||||
Description: p.Description,
|
||||
FilterString: string(filter),
|
||||
ConfigJSON: string(rawConfig),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")
|
||||
|
||||
Reference in New Issue
Block a user