Compare commits

..

18 Commits

Author SHA1 Message Date
Nikhil Mantri
38d334b9aa Merge branch 'main' into feat/update_hosts_list_error_messaging 2026-02-24 20:36:14 +05:30
Vinicius Lourenço
cb1a2a8a13 perf(bundle-size): lazy load pages to reduce main bundle size (#10230)
Some checks are pending
build-staging / prepare (push) Waiting to run
build-staging / js-build (push) Blocked by required conditions
build-staging / go-build (push) Blocked by required conditions
build-staging / staging (push) Blocked by required conditions
Release Drafter / update_release_draft (push) Waiting to run
2026-02-24 10:41:40 +00:00
Nikhil Soni
1a5d37b25a fix: add missing filtering for ip address for scalar data (#10264)
* fix: add missing filtering for ip address for scalar data

In domain listing api for external api monitoring,
we have option to filter out the IP address but
it only handles timeseries and raw type data while
domain list handler returns scalar data.

* fix: switch to new derived attributes for ip filtering

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2026-02-24 10:26:10 +00:00
Piyush Singariya
bc4273f2f8 chore: test clickhouse version 25.12.5 (#10402) 2026-02-24 14:55:51 +05:30
Nikhil Mantri
3f27e49eac Merge branch 'main' into feat/update_hosts_list_error_messaging 2026-02-20 12:57:03 +05:30
nikhilmantri0902
0b8e87ec96 chore: added test case and final comment resolve 2026-02-19 16:51:58 +05:30
nikhilmantri0902
df2916bf7f chore: review comments 1 2026-02-19 16:15:30 +05:30
Nikhil Mantri
0c62c075f9 Merge branch 'main' into feat/update_hosts_list_error_messaging 2026-02-19 15:54:14 +05:30
nikhilmantri0902
f42d95d5f5 chore: title updated for no host metrics found 2026-02-18 12:32:32 +05:30
nikhilmantri0902
32c0dfa28f chore: title for end time before earliest metadata time 2026-02-18 12:25:09 +05:30
Nikhil Mantri
68e831c4b0 Merge branch 'main' into feat/update_hosts_list_error_messaging 2026-02-17 15:51:27 +05:30
Nikhil Mantri
ed79d40492 Merge branch 'main' into feat/update_hosts_list_error_messaging 2026-02-17 14:16:13 +05:30
nikhilmantri0902
ad8223c792 chore: refactor and conditions combine 2026-02-17 13:46:55 +05:30
nikhilmantri0902
5d691476b1 chore: rearrangement 2026-02-17 12:46:38 +05:30
nikhilmantri0902
358977e203 chore: frontend messaging test fix 2026-02-16 14:37:03 +05:30
nikhilmantri0902
7ffb3e30b5 chore: improved messaging and comment 2026-02-16 14:19:12 +05:30
nikhilmantri0902
ded7b78360 chore: use named query 2026-02-16 13:43:43 +05:30
nikhilmantri0902
76e9074ca7 chore: initial logical changes 2026-02-15 19:04:44 +05:30
25 changed files with 485 additions and 552 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.10.5
- 25.12.5
schema-migrator-version:
- v0.142.0
postgres-version:

View File

@@ -308,3 +308,15 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,12 +1,10 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -29,6 +27,8 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -50,6 +50,7 @@ export interface HostListResponse {
total: number;
sentAnyHostMetricsData: boolean;
isSendingK8SAgentMetrics: boolean;
endTimeBeforeRetention: boolean;
};
}

View File

@@ -1,4 +1,4 @@
import { useCallback, useMemo } from 'react';
import React, { useCallback, useMemo } from 'react';
import { LoadingOutlined } from '@ant-design/icons';
import {
Skeleton,
@@ -14,12 +14,93 @@ import { InfraMonitoringEvents } from 'constants/events';
import HostsEmptyOrIncorrectMetrics from './HostsEmptyOrIncorrectMetrics';
import {
EmptyOrLoadingViewProps,
formatDataForTable,
getHostsListColumns,
HostRowData,
HostsListTableProps,
} from './utils';
function EmptyOrLoadingView(
viewState: EmptyOrLoadingViewProps,
): React.ReactNode {
const { isError, errorMessage } = viewState;
if (isError) {
return <Typography>{errorMessage || 'Something went wrong'}</Typography>;
}
if (viewState.showHostsEmptyState) {
return (
<HostsEmptyOrIncorrectMetrics
noData={!viewState.sentAnyHostMetricsData}
incorrectData={viewState.isSendingIncorrectK8SAgentMetrics}
/>
);
}
if (viewState.showEndTimeBeforeRetentionMessage) {
return (
<div className="hosts-empty-state-container">
<div className="hosts-empty-state-container-content">
<img className="eyes-emoji" src="/Images/eyesEmoji.svg" alt="eyes emoji" />
<div className="no-hosts-message">
<Typography.Title level={5} className="no-hosts-message-title">
Queried time range is before earliest host metrics
</Typography.Title>
<Typography.Text className="no-hosts-message-text">
Your requested end time is earlier than the earliest detected time of
host metrics data, please adjust your end time.
</Typography.Text>
</div>
</div>
</div>
);
}
if (viewState.showNoRecordsInSelectedTimeRangeMessage) {
return (
<div className="no-filtered-hosts-message-container">
<div className="no-filtered-hosts-message-content">
<img
src="/Icons/emptyState.svg"
alt="thinking-emoji"
className="empty-state-svg"
/>
<Typography.Title level={5} className="no-filtered-hosts-title">
No host metrics found
</Typography.Title>
<Typography.Text className="no-filtered-hosts-message">
No host metrics in the selected time range and filters. Please adjust your
time range or filters.
</Typography.Text>
</div>
</div>
);
}
if (viewState.showTableLoadingState) {
return (
<div className="hosts-list-loading-state">
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
</div>
);
}
return null;
}
export default function HostsListTable({
isLoading,
isFetching,
@@ -46,6 +127,11 @@ export default function HostsListTable({
[data],
);
const endTimeBeforeRetention = useMemo(
() => data?.payload?.data?.endTimeBeforeRetention || false,
[data],
);
const formattedHostMetricsData = useMemo(
() => formatDataForTable(hostMetricsData),
[hostMetricsData],
@@ -84,12 +170,6 @@ export default function HostsListTable({
});
};
const showNoFilteredHostsMessage =
!isFetching &&
!isLoading &&
formattedHostMetricsData.length === 0 &&
filters.items.length > 0;
const showHostsEmptyState =
!isFetching &&
!isLoading &&
@@ -97,63 +177,36 @@ export default function HostsListTable({
(!sentAnyHostMetricsData || isSendingIncorrectK8SAgentMetrics) &&
!filters.items.length;
const showEndTimeBeforeRetentionMessage =
!isFetching &&
!isLoading &&
formattedHostMetricsData.length === 0 &&
endTimeBeforeRetention &&
!filters.items.length;
const showNoRecordsInSelectedTimeRangeMessage =
!isFetching &&
!isLoading &&
formattedHostMetricsData.length === 0 &&
!showEndTimeBeforeRetentionMessage &&
!showHostsEmptyState;
const showTableLoadingState =
(isLoading || isFetching) && formattedHostMetricsData.length === 0;
if (isError) {
return <Typography>{data?.error || 'Something went wrong'}</Typography>;
}
const emptyOrLoadingView = EmptyOrLoadingView({
isError,
errorMessage: data?.error ?? '',
showHostsEmptyState,
sentAnyHostMetricsData,
isSendingIncorrectK8SAgentMetrics,
showEndTimeBeforeRetentionMessage,
showNoRecordsInSelectedTimeRangeMessage,
showTableLoadingState,
});
if (showHostsEmptyState) {
return (
<HostsEmptyOrIncorrectMetrics
noData={!sentAnyHostMetricsData}
incorrectData={isSendingIncorrectK8SAgentMetrics}
/>
);
}
if (showNoFilteredHostsMessage) {
return (
<div className="no-filtered-hosts-message-container">
<div className="no-filtered-hosts-message-content">
<img
src="/Icons/emptyState.svg"
alt="thinking-emoji"
className="empty-state-svg"
/>
<Typography.Text className="no-filtered-hosts-message">
This query had no results. Edit your query and try again!
</Typography.Text>
</div>
</div>
);
}
if (showTableLoadingState) {
return (
<div className="hosts-list-loading-state">
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
<Skeleton.Input
className="hosts-list-loading-state-item"
size="large"
block
active
/>
</div>
);
if (emptyOrLoadingView) {
return <>{emptyOrLoadingView}</>;
}
return (

View File

@@ -1,12 +1,16 @@
/* eslint-disable react/jsx-props-no-spreading */
import { render, screen } from '@testing-library/react';
import { HostData, HostListResponse } from 'api/infraMonitoring/getHostLists';
import { ErrorResponse, SuccessResponse } from 'types/api';
import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
import HostsListTable from '../HostsListTable';
import { HostsListTableProps } from '../utils';
const EMPTY_STATE_CONTAINER_CLASS = '.hosts-empty-state-container';
describe('HostsListTable', () => {
const mockHost = {
const createMockHost = (): HostData =>
({
hostName: 'test-host-1',
active: true,
cpu: 0.75,
@@ -14,20 +18,46 @@ describe('HostsListTable', () => {
wait: 0.03,
load15: 1.5,
os: 'linux',
};
cpuTimeSeries: { labels: {}, labelsArray: [], values: [] },
memoryTimeSeries: { labels: {}, labelsArray: [], values: [] },
waitTimeSeries: { labels: {}, labelsArray: [], values: [] },
load15TimeSeries: { labels: {}, labelsArray: [], values: [] },
} as HostData);
const mockTableData = {
const createMockTableData = (
overrides: Partial<HostListResponse['data']> = {},
): SuccessResponse<HostListResponse> => {
const mockHost = createMockHost();
return {
statusCode: 200,
message: 'Success',
error: null,
payload: {
status: 'success',
data: {
hosts: [mockHost],
type: 'list',
records: [mockHost],
groups: null,
total: 1,
sentAnyHostMetricsData: true,
isSendingK8SAgentMetrics: false,
endTimeBeforeRetention: false,
...overrides,
},
},
};
};
describe('HostsListTable', () => {
const mockHost = createMockHost();
const mockTableData = createMockTableData();
const mockOnHostClick = jest.fn();
const mockSetCurrentPage = jest.fn();
const mockSetOrderBy = jest.fn();
const mockSetPageSize = jest.fn();
const mockProps = {
const mockProps: HostsListTableProps = {
isLoading: false,
isError: false,
isFetching: false,
@@ -43,7 +73,7 @@ describe('HostsListTable', () => {
pageSize: 10,
setOrderBy: mockSetOrderBy,
setPageSize: mockSetPageSize,
} as any;
};
it('renders loading state if isLoading is true and tableData is empty', () => {
const { container } = render(
@@ -51,7 +81,7 @@ describe('HostsListTable', () => {
{...mockProps}
isLoading
hostMetricsData={[]}
tableData={{ payload: { data: { hosts: [] } } }}
tableData={createMockTableData({ records: [] })}
/>,
);
expect(container.querySelector('.hosts-list-loading-state')).toBeTruthy();
@@ -63,7 +93,7 @@ describe('HostsListTable', () => {
{...mockProps}
isFetching
hostMetricsData={[]}
tableData={{ payload: { data: { hosts: [] } } }}
tableData={createMockTableData({ records: [] })}
/>,
);
expect(container.querySelector('.hosts-list-loading-state')).toBeTruthy();
@@ -74,19 +104,56 @@ describe('HostsListTable', () => {
expect(screen.getByText('Something went wrong')).toBeTruthy();
});
it('renders "Something went wrong" fallback when isError is true and error message is empty', () => {
const tableDataWithEmptyError: ErrorResponse = {
statusCode: 500,
payload: null,
error: '',
message: null,
};
render(
<HostsListTable
{...mockProps}
isError
hostMetricsData={[]}
tableData={tableDataWithEmptyError}
/>,
);
expect(screen.getByText('Something went wrong')).toBeInTheDocument();
});
it('renders custom error message when isError is true and error message is provided', () => {
const customErrorMessage = 'Failed to fetch host metrics';
const tableDataWithError: ErrorResponse = {
statusCode: 500,
payload: null,
error: customErrorMessage,
message: null,
};
render(
<HostsListTable
{...mockProps}
isError
hostMetricsData={[]}
tableData={tableDataWithError}
/>,
);
expect(screen.getByText(customErrorMessage)).toBeInTheDocument();
});
it('renders empty state if no hosts are found', () => {
const { container } = render(
<HostsListTable
{...mockProps}
hostMetricsData={[]}
tableData={{
payload: {
data: { hosts: [] },
},
}}
tableData={createMockTableData({
records: [],
})}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
expect(
container.querySelector('.no-filtered-hosts-message-container'),
).toBeTruthy();
});
it('renders empty state if sentAnyHostMetricsData is false', () => {
@@ -94,58 +161,114 @@ describe('HostsListTable', () => {
<HostsListTable
{...mockProps}
hostMetricsData={[]}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
sentAnyHostMetricsData: false,
hosts: [],
},
},
}}
tableData={createMockTableData({
sentAnyHostMetricsData: false,
records: [],
})}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
});
it('renders empty state if isSendingIncorrectK8SAgentMetrics is true', () => {
it('renders empty state if isSendingK8SAgentMetrics is true', () => {
const { container } = render(
<HostsListTable
{...mockProps}
hostMetricsData={[]}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
isSendingIncorrectK8SAgentMetrics: true,
hosts: [],
},
},
}}
tableData={createMockTableData({
isSendingK8SAgentMetrics: true,
records: [],
})}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
});
it('renders end time before retention message when endTimeBeforeRetention is true', () => {
const { container } = render(
<HostsListTable
{...mockProps}
hostMetricsData={[]}
tableData={createMockTableData({
sentAnyHostMetricsData: true,
isSendingK8SAgentMetrics: false,
endTimeBeforeRetention: true,
records: [],
})}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
expect(
screen.getByText(
/Your requested end time is earlier than the earliest detected time of host metrics data, please adjust your end time\./,
),
).toBeInTheDocument();
});
it('renders no records message when noRecordsInSelectedTimeRangeAndFilters is true', () => {
const { container } = render(
<HostsListTable
{...mockProps}
hostMetricsData={[]}
tableData={createMockTableData({
sentAnyHostMetricsData: true,
isSendingK8SAgentMetrics: false,
records: [],
})}
/>,
);
expect(
container.querySelector('.no-filtered-hosts-message-container'),
).toBeTruthy();
expect(
screen.getByText(/No host metrics in the selected time range and filters/),
).toBeInTheDocument();
});
it('renders no filtered hosts message when filters are present and no hosts are found', () => {
const { container } = render(
<HostsListTable
{...mockProps}
hostMetricsData={[]}
filters={{
items: [
{
id: 'host_name',
key: {
key: 'host_name',
dataType: DataTypes.String,
type: 'tag',
isIndexed: true,
},
op: '=',
value: 'unknown',
},
],
op: 'AND',
}}
tableData={createMockTableData({
sentAnyHostMetricsData: true,
isSendingK8SAgentMetrics: false,
records: [],
})}
/>,
);
expect(container.querySelector('.no-filtered-hosts-message')).toBeTruthy();
expect(
screen.getByText(
/No host metrics in the selected time range and filters\. Please adjust your time range or filters\./,
),
).toBeInTheDocument();
});
it('renders table data', () => {
const { container } = render(
<HostsListTable
{...mockProps}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
isSendingIncorrectK8SAgentMetrics: false,
sentAnyHostMetricsData: true,
},
},
}}
tableData={createMockTableData({
isSendingK8SAgentMetrics: false,
sentAnyHostMetricsData: true,
})}
/>,
);
expect(container.querySelector('.hosts-list-table')).toBeTruthy();

View File

@@ -107,6 +107,17 @@ export interface HostsListTableProps {
setPageSize: (pageSize: number) => void;
}
export interface EmptyOrLoadingViewProps {
isError: boolean;
errorMessage: string;
showHostsEmptyState: boolean;
sentAnyHostMetricsData: boolean;
isSendingIncorrectK8SAgentMetrics: boolean;
showEndTimeBeforeRetentionMessage: boolean;
showNoRecordsInSelectedTimeRangeMessage: boolean;
showTableLoadingState: boolean;
}
export const getHostListsQuery = (): HostListPayload => ({
filters: {
items: [],

View File

@@ -1,12 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/gorilla/mux"
)
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
return nil
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -49,7 +48,6 @@ type provider struct {
authzHandler authz.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
logspipelineHandler logspipeline.Handler
}
func NewFactory(
@@ -71,7 +69,6 @@ func NewFactory(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -96,7 +93,6 @@ func NewFactory(
authzHandler,
zeusHandler,
querierHandler,
logspipelineHandler,
)
})
}
@@ -123,7 +119,6 @@ func newProvider(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -148,7 +143,6 @@ func newProvider(
authzHandler: authzHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
logspipelineHandler: logspipelineHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -229,14 +223,9 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addLogspipelineRoutes(router); err != nil {
return err
}
return nil
}
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
return []handler.OpenAPISecurityScheme{
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},

View File

@@ -1,91 +0,0 @@
package impllogspipeline
import (
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module logspipeline.Module
}
func NewHandler(module logspipeline.Module) logspipeline.Handler {
return &handler{module: module}
}
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
}
if version != -1 {
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
return
}
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
}
func ParseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}

View File

@@ -1,230 +0,0 @@
package impllogspipeline
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type module struct {
sqlstore sqlstore.SQLStore
}
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
return &module{sqlstore: sqlstore}
}
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
latestVersion := -1
// get latest agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
}
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
var stored []pipelinetypes.StoreablePipeline
err := m.sqlstore.BunDB().NewSelect().
Model(&stored).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID.StringValue()).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
return nil, err
}
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
if len(stored) == 0 {
return pipelines, nil
}
for i := range stored {
pipelines[i].StoreablePipeline = stored[i]
if err := pipelines[i].ParseRawConfig(); err != nil {
return nil, err
}
if err := pipelines[i].ParseFilter(); err != nil {
return nil, err
}
}
return pipelines, nil
}
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
return nil, nil
}
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
// regenerate the id and set other fields
storeable.Identifiable.ID = valuer.GenerateUUID()
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
CreatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
CreatedBy: claims.Email,
}
_, err = m.sqlstore.BunDB().NewInsert().
Model(&storeable).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
if err := pipeline.IsValid(); err != nil {
return nil, err
}
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
UpdatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
UpdatedBy: claims.Email,
}
// get id from storeable pipeline
id := storeable.ID.StringValue()
// depending on the order_id update the rest of the table
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// ^ |
// |_________|
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// | ^
// |_____|
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Model(&existing).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
id,
orgID.StringValue(),
)
}
oldOrderID := existing.OrderID
newOrderID := storeable.OrderID
// Reorder other pipelines if the order has changed.
if newOrderID != oldOrderID {
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
return err
}
}
// Preserve primary key and immutable fields.
storeable.ID = existing.ID
// Persist the updated pipeline (including its new order).
if _, err := db.NewUpdate().
Model(storeable).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
// The logic is:
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
switch {
case newOrderID < oldOrderID:
// Move up: shift affected pipelines down (order_id + 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id + 1").
Where("org_id = ?", orgID).
Where("order_id >= ?", newOrderID).
Where("order_id < ?", oldOrderID).
Exec(ctx)
return err
case newOrderID > oldOrderID:
// Move down: shift affected pipelines up (order_id - 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID).
Where("order_id > ?", oldOrderID).
Where("order_id <= ?", newOrderID).
Exec(ctx)
return err
default:
return nil
}
}
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
return nil
}

View File

@@ -1,27 +0,0 @@
package logspipeline
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
}
type Handler interface {
ListPipelines(w http.ResponseWriter, r *http.Request)
GetPipeline(w http.ResponseWriter, r *http.Request)
CreatePipeline(w http.ResponseWriter, r *http.Request)
UpdatePipeline(w http.ResponseWriter, r *http.Request)
DeletePipeline(w http.ResponseWriter, r *http.Request)
}

View File

@@ -120,6 +120,8 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
}
}
resultData.Rows = filteredRows
case *qbtypes.ScalarData:
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
}
filteredData = append(filteredData, result)
@@ -145,6 +147,39 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
return true
}
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
// Find column indices for server address fields
serverColIndices := make([]int, 0)
for i, col := range columns {
if col.Name == derivedKeyHTTPHost {
serverColIndices = append(serverColIndices, i)
}
}
if len(serverColIndices) == 0 {
return data
}
filtered := make([][]any, 0, len(data))
for _, row := range data {
includeRow := true
for _, colIdx := range serverColIndices {
if colIdx < len(row) {
if strVal, ok := row[colIdx].(string); ok {
if net.ParseIP(strVal) != nil {
includeRow = false
break
}
}
}
}
if includeRow {
filtered = append(filtered, row)
}
}
return filtered
}
func shouldIncludeRow(row *qbtypes.RawRow) bool {
if row.Data != nil {
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {

View File

@@ -117,6 +117,59 @@ func TestFilterResponse(t *testing.T) {
},
},
},
{
name: "should filter out IP addresses from scalar data",
input: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"192.168.1.1", 10},
{"example.com", 20},
{"10.0.0.1", 5},
},
},
},
},
},
},
expected: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"example.com", 20},
},
},
},
},
},
},
},
}
for _, tt := range tests {

View File

@@ -4308,6 +4308,28 @@ func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([
}
// GetHostMetricsExistenceAndEarliestTime returns (count, minFirstReportedUnixMilli, error) for the given host metric names
// from distributed_metadata. When count is 0, minFirstReportedUnixMilli is 0.
func (r *ClickHouseReader) GetHostMetricsExistenceAndEarliestTime(ctx context.Context, metricNames []string) (uint64, uint64, error) {
if len(metricNames) == 0 {
return 0, 0, nil
}
query := fmt.Sprintf(
`SELECT count(*) AS cnt, min(first_reported_unix_milli) AS min_first_reported
FROM %s.%s
WHERE metric_name IN @metric_names`,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_METADATA_TABLENAME)
var count, minFirstReported uint64
err := r.db.QueryRow(ctx, query, clickhouse.Named("metric_names", metricNames)).Scan(&count, &minFirstReported)
if err != nil {
zap.L().Error("error getting host metrics existence and earliest time", zap.Error(err))
return 0, 0, err
}
return count, minFirstReported, nil
}
func getPersonalisedError(err error) error {
if err == nil {
return nil

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -4049,6 +4048,26 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
}
func parseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{}
@@ -4079,7 +4098,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
version, err := impllogspipeline.ParseAgentConfigVersion(r)
version, err := parseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return

View File

@@ -10,6 +10,7 @@ import (
)
var dotMetricMap = map[string]string{
"system_filesystem_usage": "system.filesystem.usage",
"system_cpu_time": "system.cpu.time",
"system_memory_usage": "system.memory.usage",
"system_cpu_load_average_15m": "system.cpu.load_average.15m",

View File

@@ -67,10 +67,11 @@ var (
GetDotMetrics("os_type"),
}
metricNamesForHosts = map[string]string{
"cpu": GetDotMetrics("system_cpu_time"),
"memory": GetDotMetrics("system_memory_usage"),
"load15": GetDotMetrics("system_cpu_load_average_15m"),
"wait": GetDotMetrics("system_cpu_time"),
"filesystem": GetDotMetrics("system_filesystem_usage"),
"cpu": GetDotMetrics("system_cpu_time"),
"memory": GetDotMetrics("system_memory_usage"),
"load15": GetDotMetrics("system_cpu_load_average_15m"),
"wait": GetDotMetrics("system_cpu_time"),
}
)
@@ -316,24 +317,15 @@ func (h *HostsRepo) getTopHostGroups(ctx context.Context, orgID valuer.UUID, req
return topHostGroups, allHostGroups, nil
}
func (h *HostsRepo) DidSendHostMetricsData(ctx context.Context, req model.HostListRequest) (bool, error) {
// GetHostMetricsExistenceAndEarliestTime returns (count, minFirstReportedUnixMilli, error) for host metrics
// in distributed_metadata. Uses metricNamesForHosts plus system.filesystem.usage.
func (h *HostsRepo) GetHostMetricsExistenceAndEarliestTime(ctx context.Context, req model.HostListRequest) (uint64, uint64, error) {
names := []string{}
for _, metricName := range metricNamesForHosts {
names = append(names, metricName)
}
namesStr := "'" + strings.Join(names, "','") + "'"
query := fmt.Sprintf("SELECT count() FROM %s.%s WHERE metric_name IN (%s)",
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)
count, err := h.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}
return count > 0, nil
return h.reader.GetHostMetricsExistenceAndEarliestTime(ctx, names)
}
func (h *HostsRepo) IsSendingK8SAgentMetrics(ctx context.Context, req model.HostListRequest) ([]string, []string, error) {
@@ -412,8 +404,25 @@ func (h *HostsRepo) GetHostList(ctx context.Context, orgID valuer.UUID, req mode
resp.ClusterNames = clusterNames
resp.NodeNames = nodeNames
}
if sentAnyHostMetricsData, err := h.DidSendHostMetricsData(ctx, req); err == nil {
resp.SentAnyHostMetricsData = sentAnyHostMetricsData
// 1. Check if any host metrics exist and get earliest retention time
// if no hosts metrics exist, that means we should show the onboarding guide on UI, and return early.
// 2. If host metrics exist, but req.End is earlier than the earliest time of host metrics as read from
// metadata table, then we should convey the same to the user and return early
if count, minFirstReportedUnixMilli, err := h.GetHostMetricsExistenceAndEarliestTime(ctx, req); err == nil {
if count == 0 {
resp.SentAnyHostMetricsData = false
resp.Records = []model.HostListRecord{}
resp.Total = 0
return resp, nil
}
resp.SentAnyHostMetricsData = true
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []model.HostListRecord{}
resp.Total = 0
return resp, nil
}
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))

View File

@@ -125,6 +125,8 @@ const (
SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME = "distributed_time_series_v4_6hrs"
SIGNOZ_ATTRIBUTES_METADATA_TABLENAME = "distributed_attributes_metadata"
SIGNOZ_ATTRIBUTES_METADATA_LOCAL_TABLENAME = "attributes_metadata"
SIGNOZ_METADATA_TABLENAME = "distributed_metadata"
SIGNOZ_METADATA_LOCAL_TABLENAME = "metadata"
)
// alert related constants

View File

@@ -100,6 +100,8 @@ type Reader interface {
GetCountOfThings(ctx context.Context, query string) (uint64, error)
GetHostMetricsExistenceAndEarliestTime(ctx context.Context, metricNames []string) (uint64, uint64, error)
//trace
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError

View File

@@ -44,6 +44,7 @@ type HostListResponse struct {
IsSendingK8SAgentMetrics bool `json:"isSendingK8SAgentMetrics"`
ClusterNames []string `json:"clusterNames"`
NodeNames []string `json:"nodeNames"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention"`
}
func (r *HostListResponse) SortBy(orderBy *v3.OrderBy) {

View File

@@ -13,8 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -68,7 +66,6 @@ type Modules struct {
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
Promote promote.Module
LogsPipeline logspipeline.Module
}
func NewModules(
@@ -113,6 +110,5 @@ func NewModules(
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
LogsPipeline: impllogspipeline.NewModule(sqlstore),
}
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -60,7 +59,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ authz.Handler }{},
struct{ zeus.Handler }{},
struct{ querier.Handler }{},
struct{ logspipeline.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
@@ -255,7 +254,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.AuthzHandler,
handlers.ZeusHandler,
handlers.QuerierHandler,
impllogspipeline.NewHandler(modules.LogsPipeline),
),
)
}

View File

@@ -10,7 +10,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -267,37 +266,6 @@ func (p *PostablePipeline) IsValid() error {
return nil
}
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
rawConfig, err := json.Marshal(p.Config)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
}
filter, err := json.Marshal(p.Filter)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
}
identifier := valuer.GenerateUUID()
if p.ID != "" {
identifier, err = valuer.NewUUID(p.ID)
if err != nil {
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
}
}
return &StoreablePipeline{
Identifiable: types.Identifiable{
ID: identifier,
},
OrderID: p.OrderID,
Enabled: p.Enabled,
Name: p.Name,
Alias: p.Alias,
Description: p.Description,
FilterString: string(filter),
ConfigJSON: string(rawConfig),
}, nil
}
func isValidOperator(op PipelineOperator) error {
if op.ID == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")