Compare commits

..

2 Commits

Author SHA1 Message Date
Piyush Singariya
ae55a51d66 feat: update module function 2026-02-24 18:13:32 +05:30
Piyush Singariya
023e315a22 fix: setting up foundation of logs pipelines 2026-02-24 17:05:45 +05:30
21 changed files with 495 additions and 197 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.12.5
- 25.10.5
schema-migrator-version:
- v0.142.0
postgres-version:

View File

@@ -308,15 +308,3 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,10 +1,12 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -27,8 +29,6 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -40,7 +40,6 @@ function ValueGraph({
value,
rawValue,
thresholds,
yAxisUnit,
}: ValueGraphProps): JSX.Element {
const { t } = useTranslation(['valueGraph']);
const containerRef = useRef<HTMLDivElement>(null);
@@ -88,7 +87,7 @@ function ValueGraph({
const {
threshold,
isConflictingThresholds,
} = getBackgroundColorAndThresholdCheck(thresholds, rawValue, yAxisUnit);
} = getBackgroundColorAndThresholdCheck(thresholds, rawValue);
return (
<div
@@ -156,7 +155,6 @@ interface ValueGraphProps {
value: string;
rawValue: number;
thresholds: ThresholdProps[];
yAxisUnit?: string;
}
export default ValueGraph;

View File

@@ -1,10 +1,9 @@
import { evaluateThresholdWithConvertedValue } from 'container/GridTableComponent/utils';
import { getYAxisFormattedValue } from 'components/Graph/yAxisConfig';
import { ThresholdProps } from 'container/NewWidget/RightContainer/Threshold/types';
function doesValueSatisfyThreshold(
function compareThreshold(
rawValue: number,
threshold: ThresholdProps,
yAxisUnit?: string,
): boolean {
if (
threshold.thresholdOperator === undefined ||
@@ -12,14 +11,31 @@ function doesValueSatisfyThreshold(
) {
return false;
}
switch (threshold.thresholdOperator) {
case '>':
return rawValue > threshold.thresholdValue;
case '>=':
return rawValue >= threshold.thresholdValue;
case '<':
return rawValue < threshold.thresholdValue;
case '<=':
return rawValue <= threshold.thresholdValue;
case '=':
return rawValue === threshold.thresholdValue;
default:
return false;
}
}
return evaluateThresholdWithConvertedValue(
rawValue,
threshold.thresholdValue,
threshold.thresholdOperator,
threshold.thresholdUnit,
yAxisUnit,
);
function extractNumbersFromString(inputString: string): number[] {
const regex = /[+-]?\d+(\.\d+)?/g;
const matches = inputString.match(regex);
if (matches) {
return matches.map(Number);
}
return [];
}
function getHighestPrecedenceThreshold(
@@ -47,13 +63,17 @@ function getHighestPrecedenceThreshold(
export function getBackgroundColorAndThresholdCheck(
thresholds: ThresholdProps[],
rawValue: number,
yAxisUnit?: string,
): {
threshold: ThresholdProps;
isConflictingThresholds: boolean;
} {
const matchingThresholds = thresholds.filter((threshold) =>
doesValueSatisfyThreshold(rawValue, threshold, yAxisUnit),
compareThreshold(
extractNumbersFromString(
getYAxisFormattedValue(rawValue.toString(), threshold.thresholdUnit || ''),
)[0],
threshold,
),
);
if (matchingThresholds.length === 0) {

View File

@@ -49,7 +49,7 @@ function evaluateCondition(
* @param columnUnit - The current unit of the value.
* @returns A boolean indicating whether the value meets the threshold condition.
*/
export function evaluateThresholdWithConvertedValue(
function evaluateThresholdWithConvertedValue(
value: number,
thresholdValue: number,
thresholdOperator?: string,

View File

@@ -99,7 +99,6 @@ function GridValueComponent({
<ValueGraph
thresholds={thresholds || []}
rawValue={value}
yAxisUnit={yAxisUnit}
value={
yAxisUnit
? getYAxisFormattedValue(

View File

@@ -356,10 +356,7 @@ function Threshold({
)}
</div>
{isInvalidUnitComparison && (
<Typography.Text
className="invalid-unit"
data-testid="invalid-unit-comparison"
>
<Typography.Text className="invalid-unit">
Threshold unit ({unit}) is not valid in comparison with the{' '}
{selectedGraph === PANEL_TYPES.TABLE ? 'column' : 'y-axis'} unit (
{selectedGraph === PANEL_TYPES.TABLE

View File

@@ -1,8 +1,6 @@
/* eslint-disable react/jsx-props-no-spreading */
import { DndProvider } from 'react-dnd';
import { HTML5Backend } from 'react-dnd-html5-backend';
import { Y_AXIS_UNIT_NAMES } from 'components/YAxisUnitSelector/constants';
import { UniversalYAxisUnit } from 'components/YAxisUnitSelector/types';
import { PANEL_TYPES } from 'constants/queryBuilder';
import { render, screen } from 'tests/test-utils';
@@ -19,18 +17,9 @@ jest.mock('lib/query/createTableColumnsFromQuery', () => ({
// Mock the unitOptions function
jest.mock('container/NewWidget/utils', () => ({
unitOptions: jest.fn(() => [
{
value: UniversalYAxisUnit.NONE,
label: Y_AXIS_UNIT_NAMES[UniversalYAxisUnit.NONE],
},
{
value: UniversalYAxisUnit.PERCENT,
label: Y_AXIS_UNIT_NAMES[UniversalYAxisUnit.PERCENT],
},
{
value: UniversalYAxisUnit.MILLISECONDS,
label: Y_AXIS_UNIT_NAMES[UniversalYAxisUnit.MILLISECONDS],
},
{ value: 'none', label: 'None' },
{ value: '%', label: 'Percent (0 - 100)' },
{ value: 'ms', label: 'Milliseconds (ms)' },
]),
}));
@@ -39,7 +28,7 @@ const defaultProps = {
keyIndex: 0,
thresholdOperator: '>' as const,
thresholdValue: 50,
thresholdUnit: UniversalYAxisUnit.NONE,
thresholdUnit: 'none',
thresholdColor: 'Red',
thresholdFormat: 'Text' as const,
isEditEnabled: true,
@@ -49,11 +38,8 @@ const defaultProps = {
{ value: 'memory_usage', label: 'Memory Usage' },
],
thresholdTableOptions: 'cpu_usage',
columnUnits: {
cpu_usage: UniversalYAxisUnit.PERCENT,
memory_usage: UniversalYAxisUnit.BYTES,
},
yAxisUnit: UniversalYAxisUnit.PERCENT,
columnUnits: { cpu_usage: 'percent', memory_usage: 'bytes' },
yAxisUnit: '%',
moveThreshold: jest.fn(),
};
@@ -82,27 +68,28 @@ describe('Threshold Component Unit Validation', () => {
it('should show validation error when threshold unit is not "none" and units are incompatible', () => {
// Act - Render component with incompatible units (ms vs percent)
renderThreshold({
thresholdUnit: UniversalYAxisUnit.MILLISECONDS,
thresholdUnit: 'ms',
thresholdValue: 50,
});
const errorMessage = screen.getByTestId('invalid-unit-comparison');
// Assert - Validation error should be displayed
expect(errorMessage.textContent).toBe(
`Threshold unit (${UniversalYAxisUnit.MILLISECONDS}) is not valid in comparison with the column unit (${UniversalYAxisUnit.PERCENT})`,
);
expect(
screen.getByText(
/Threshold unit \(ms\) is not valid in comparison with the column unit \(percent\)/i,
),
).toBeInTheDocument();
});
it('should not show validation error when threshold unit matches column unit', () => {
// Act - Render component with matching units
renderThreshold({
thresholdUnit: UniversalYAxisUnit.PERCENT,
thresholdUnit: 'percent',
thresholdValue: 50,
});
// Assert - No validation error should be displayed
expect(
screen.queryByTestId('invalid-unit-comparison'),
screen.queryByText(/Threshold unit.*is not valid in comparison/i),
).not.toBeInTheDocument();
});
@@ -110,16 +97,17 @@ describe('Threshold Component Unit Validation', () => {
// Act - Render component for time series with incompatible units
renderThreshold({
selectedGraph: PANEL_TYPES.TIME_SERIES,
thresholdUnit: UniversalYAxisUnit.MILLISECONDS,
thresholdUnit: 'ms',
thresholdValue: 100,
yAxisUnit: UniversalYAxisUnit.PERCENT,
yAxisUnit: 'percent',
});
const errorMessage = screen.getByTestId('invalid-unit-comparison');
// Assert - Validation error should be displayed
expect(errorMessage.textContent).toBe(
`Threshold unit (${UniversalYAxisUnit.MILLISECONDS}) is not valid in comparison with the y-axis unit (${UniversalYAxisUnit.PERCENT})`,
);
expect(
screen.getByText(
/Threshold unit \(ms\) is not valid in comparison with the y-axis unit \(percent\)/i,
),
).toBeInTheDocument();
});
it('should not show validation error for time series graph when threshold unit is "none"', () => {
@@ -128,39 +116,43 @@ describe('Threshold Component Unit Validation', () => {
selectedGraph: PANEL_TYPES.TIME_SERIES,
thresholdUnit: 'none',
thresholdValue: 100,
yAxisUnit: UniversalYAxisUnit.PERCENT,
yAxisUnit: 'percent',
});
// Assert - No validation error should be displayed
expect(
screen.queryByTestId('invalid-unit-comparison'),
screen.queryByText(/Threshold unit.*is not valid in comparison/i),
).not.toBeInTheDocument();
});
it('should not show validation error when threshold unit is compatible with column unit', () => {
// Act - Render component with compatible units (both in same category - Time)
renderThreshold({
thresholdUnit: UniversalYAxisUnit.SECONDS,
thresholdUnit: 's',
thresholdValue: 100,
columnUnits: { cpu_usage: UniversalYAxisUnit.MILLISECONDS },
columnUnits: { cpu_usage: 'ms' },
thresholdTableOptions: 'cpu_usage',
});
// Assert - No validation error should be displayed
expect(
screen.queryByTestId('invalid-unit-comparison'),
screen.queryByText(/Threshold unit.*is not valid in comparison/i),
).not.toBeInTheDocument();
});
it('should show validation error when threshold unit is in different category than column unit', () => {
// Act - Render component with units from different categories
renderThreshold({
thresholdUnit: UniversalYAxisUnit.BYTES,
thresholdUnit: 'bytes',
thresholdValue: 100,
yAxisUnit: UniversalYAxisUnit.PERCENT,
yAxisUnit: 'percent',
});
const errorMessage = screen.getByTestId('invalid-unit-comparison');
// Assert - Validation error should be displayed
expect(errorMessage.textContent).toBe(
`Threshold unit (${UniversalYAxisUnit.BYTES}) is not valid in comparison with the column unit (${UniversalYAxisUnit.PERCENT})`,
);
expect(
screen.getByText(
/Threshold unit \(bytes\) is not valid in comparison with the column unit \(percent\)/i,
),
).toBeInTheDocument();
});
});

View File

@@ -0,0 +1,12 @@
package signozapiserver
import (
"net/http"
"github.com/gorilla/mux"
)
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -48,6 +49,7 @@ type provider struct {
authzHandler authz.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
logspipelineHandler logspipeline.Handler
}
func NewFactory(
@@ -69,6 +71,7 @@ func NewFactory(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -93,6 +96,7 @@ func NewFactory(
authzHandler,
zeusHandler,
querierHandler,
logspipelineHandler,
)
})
}
@@ -119,6 +123,7 @@ func newProvider(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -143,6 +148,7 @@ func newProvider(
authzHandler: authzHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
logspipelineHandler: logspipelineHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -223,9 +229,14 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addLogspipelineRoutes(router); err != nil {
return err
}
return nil
}
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
return []handler.OpenAPISecurityScheme{
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},

View File

@@ -0,0 +1,91 @@
package impllogspipeline
import (
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module logspipeline.Module
}
func NewHandler(module logspipeline.Module) logspipeline.Handler {
return &handler{module: module}
}
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
}
if version != -1 {
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
return
}
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
}
func ParseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}

View File

@@ -0,0 +1,230 @@
package impllogspipeline
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type module struct {
sqlstore sqlstore.SQLStore
}
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
return &module{sqlstore: sqlstore}
}
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
latestVersion := -1
// get latest agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
}
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
var stored []pipelinetypes.StoreablePipeline
err := m.sqlstore.BunDB().NewSelect().
Model(&stored).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID.StringValue()).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
return nil, err
}
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
if len(stored) == 0 {
return pipelines, nil
}
for i := range stored {
pipelines[i].StoreablePipeline = stored[i]
if err := pipelines[i].ParseRawConfig(); err != nil {
return nil, err
}
if err := pipelines[i].ParseFilter(); err != nil {
return nil, err
}
}
return pipelines, nil
}
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
return nil, nil
}
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
// regenerate the id and set other fields
storeable.Identifiable.ID = valuer.GenerateUUID()
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
CreatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
CreatedBy: claims.Email,
}
_, err = m.sqlstore.BunDB().NewInsert().
Model(&storeable).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
if err := pipeline.IsValid(); err != nil {
return nil, err
}
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
UpdatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
UpdatedBy: claims.Email,
}
// get id from storeable pipeline
id := storeable.ID.StringValue()
// depending on the order_id update the rest of the table
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// ^ |
// |_________|
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// | ^
// |_____|
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Model(&existing).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
id,
orgID.StringValue(),
)
}
oldOrderID := existing.OrderID
newOrderID := storeable.OrderID
// Reorder other pipelines if the order has changed.
if newOrderID != oldOrderID {
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
return err
}
}
// Preserve primary key and immutable fields.
storeable.ID = existing.ID
// Persist the updated pipeline (including its new order).
if _, err := db.NewUpdate().
Model(storeable).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
// The logic is:
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
switch {
case newOrderID < oldOrderID:
// Move up: shift affected pipelines down (order_id + 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id + 1").
Where("org_id = ?", orgID).
Where("order_id >= ?", newOrderID).
Where("order_id < ?", oldOrderID).
Exec(ctx)
return err
case newOrderID > oldOrderID:
// Move down: shift affected pipelines up (order_id - 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID).
Where("order_id > ?", oldOrderID).
Where("order_id <= ?", newOrderID).
Exec(ctx)
return err
default:
return nil
}
}
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
return nil
}

View File

@@ -0,0 +1,27 @@
package logspipeline
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
}
type Handler interface {
ListPipelines(w http.ResponseWriter, r *http.Request)
GetPipeline(w http.ResponseWriter, r *http.Request)
CreatePipeline(w http.ResponseWriter, r *http.Request)
UpdatePipeline(w http.ResponseWriter, r *http.Request)
DeletePipeline(w http.ResponseWriter, r *http.Request)
}

View File

@@ -120,8 +120,6 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
}
}
resultData.Rows = filteredRows
case *qbtypes.ScalarData:
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
}
filteredData = append(filteredData, result)
@@ -147,39 +145,6 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
return true
}
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
// Find column indices for server address fields
serverColIndices := make([]int, 0)
for i, col := range columns {
if col.Name == derivedKeyHTTPHost {
serverColIndices = append(serverColIndices, i)
}
}
if len(serverColIndices) == 0 {
return data
}
filtered := make([][]any, 0, len(data))
for _, row := range data {
includeRow := true
for _, colIdx := range serverColIndices {
if colIdx < len(row) {
if strVal, ok := row[colIdx].(string); ok {
if net.ParseIP(strVal) != nil {
includeRow = false
break
}
}
}
}
if includeRow {
filtered = append(filtered, row)
}
}
return filtered
}
func shouldIncludeRow(row *qbtypes.RawRow) bool {
if row.Data != nil {
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {

View File

@@ -117,59 +117,6 @@ func TestFilterResponse(t *testing.T) {
},
},
},
{
name: "should filter out IP addresses from scalar data",
input: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"192.168.1.1", 10},
{"example.com", 20},
{"10.0.0.1", 5},
},
},
},
},
},
},
expected: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"example.com", 20},
},
},
},
},
},
},
},
}
for _, tt := range tests {

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -4048,26 +4049,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
}
func parseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{}
@@ -4098,7 +4079,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
version, err := parseAgentConfigVersion(r)
version, err := impllogspipeline.ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return

View File

@@ -13,6 +13,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -66,6 +68,7 @@ type Modules struct {
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
Promote promote.Module
LogsPipeline logspipeline.Module
}
func NewModules(
@@ -110,5 +113,6 @@ func NewModules(
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
LogsPipeline: impllogspipeline.NewModule(sqlstore),
}
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -59,6 +60,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ authz.Handler }{},
struct{ zeus.Handler }{},
struct{ querier.Handler }{},
struct{ logspipeline.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -23,6 +23,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
@@ -254,6 +255,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.AuthzHandler,
handlers.ZeusHandler,
handlers.QuerierHandler,
impllogspipeline.NewHandler(modules.LogsPipeline),
),
)
}

View File

@@ -10,6 +10,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -266,6 +267,37 @@ func (p *PostablePipeline) IsValid() error {
return nil
}
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
rawConfig, err := json.Marshal(p.Config)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
}
filter, err := json.Marshal(p.Filter)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
}
identifier := valuer.GenerateUUID()
if p.ID != "" {
identifier, err = valuer.NewUUID(p.ID)
if err != nil {
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
}
}
return &StoreablePipeline{
Identifiable: types.Identifiable{
ID: identifier,
},
OrderID: p.OrderID,
Enabled: p.Enabled,
Name: p.Name,
Alias: p.Alias,
Description: p.Description,
FilterString: string(filter),
ConfigJSON: string(rawConfig),
}, nil
}
func isValidOperator(op PipelineOperator) error {
if op.ID == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")