Compare commits

..

9 Commits

Author SHA1 Message Date
Karan Balani
4ca80c44fd feat: add org id support in root user config 2026-02-25 13:26:45 +05:30
Ishan
c579614d56 feat: color fallback and red checks (#10389)
* feat: color fallback and red checks

* feat: testcase added
2026-02-25 11:54:22 +05:30
Ishan
78ba2ba356 feat: text selection block (#10373)
* feat: text selection block

* chore: added test file
2026-02-25 11:38:15 +05:30
Ishan
7fd4762e2a feat: ui bugs body width and table css (#10377)
* feat: ui bugs body width and table css

* feat: defualt open search overview

* feat: added timerRef to cleanup
2026-02-25 11:25:54 +05:30
Nageshbansal
4e4c9ce5af chore: enable metadataexporter in docker (#10409)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
2026-02-25 03:13:27 +05:30
Srikanth Chekuri
7605775a38 chore: remove support for non v5 version in rules (#10406) 2026-02-24 23:16:21 +05:30
Vinicius Lourenço
cb1a2a8a13 perf(bundle-size): lazy load pages to reduce main bundle size (#10230)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
2026-02-24 10:41:40 +00:00
Nikhil Soni
1a5d37b25a fix: add missing filtering for ip address for scalar data (#10264)
* fix: add missing filtering for ip address for scalar data

In domain listing api for external api monitoring,
we have option to filter out the IP address but
it only handles timeseries and raw type data while
domain list handler returns scalar data.

* fix: switch to new derived attributes for ip filtering

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2026-02-24 10:26:10 +00:00
Piyush Singariya
bc4273f2f8 chore: test clickhouse version 25.12.5 (#10402) 2026-02-24 14:55:51 +05:30
31 changed files with 640 additions and 552 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.10.5
- 25.12.5
schema-migrator-version:
- v0.142.0
postgres-version:

View File

@@ -82,6 +82,12 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -93,19 +99,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, signozmeter]
exporters: [clickhousetraces, metadataexporter, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, signozmeter]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -82,6 +82,12 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -93,19 +99,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, signozmeter]
exporters: [clickhousetraces, metadataexporter, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, signozmeter]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -308,3 +308,15 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,12 +1,10 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -29,6 +27,8 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -21,7 +21,7 @@ export function getDefaultCellStyle(isDarkMode?: boolean): CSSProperties {
export const defaultTableStyle: CSSProperties = {
minWidth: '40rem',
maxWidth: '60rem',
maxWidth: '90rem',
};
export const defaultListViewPanelStyle: CSSProperties = {

View File

@@ -1,6 +1,7 @@
import { ReactNode, useState } from 'react';
import { ReactNode, useCallback, useEffect, useRef, useState } from 'react';
import MEditor, { EditorProps, Monaco } from '@monaco-editor/react';
import { Color } from '@signozhq/design-tokens';
import type { InputRef } from 'antd';
import {
Button,
Collapse,
@@ -46,12 +47,23 @@ function Overview({
handleChangeSelectedView,
}: Props): JSX.Element {
const [isWrapWord, setIsWrapWord] = useState<boolean>(true);
const [isSearchVisible, setIsSearchVisible] = useState<boolean>(false);
const [isSearchVisible, setIsSearchVisible] = useState<boolean>(true);
const [isAttributesExpanded, setIsAttributesExpanded] = useState<boolean>(
true,
);
const [fieldSearchInput, setFieldSearchInput] = useState<string>('');
const focusTimerRef = useRef<ReturnType<typeof setTimeout>>();
const searchInputRef = useCallback((node: InputRef | null) => {
clearTimeout(focusTimerRef.current);
if (node) {
focusTimerRef.current = setTimeout(() => node.focus(), 100);
}
}, []);
useEffect(() => (): void => clearTimeout(focusTimerRef.current), []);
const isDarkMode = useIsDarkMode();
const options: EditorProps['options'] = {
@@ -196,7 +208,7 @@ function Overview({
<>
{isSearchVisible && (
<Input
autoFocus
ref={searchInputRef}
placeholder="Search for a field..."
className="search-input"
value={fieldSearchInput}

View File

@@ -0,0 +1,34 @@
import { Color } from '@signozhq/design-tokens';
import { getColorsForSeverityLabels, isRedLike } from '../utils';
describe('getColorsForSeverityLabels', () => {
it('should return slate for blank labels', () => {
expect(getColorsForSeverityLabels('', 0)).toBe(Color.BG_SLATE_300);
expect(getColorsForSeverityLabels(' ', 0)).toBe(Color.BG_SLATE_300);
});
it('should return correct colors for known severity variants', () => {
expect(getColorsForSeverityLabels('INFO', 0)).toBe(Color.BG_ROBIN_600);
expect(getColorsForSeverityLabels('ERROR', 0)).toBe(Color.BG_CHERRY_600);
expect(getColorsForSeverityLabels('WARN', 0)).toBe(Color.BG_AMBER_600);
expect(getColorsForSeverityLabels('DEBUG', 0)).toBe(Color.BG_AQUA_600);
expect(getColorsForSeverityLabels('TRACE', 0)).toBe(Color.BG_FOREST_600);
expect(getColorsForSeverityLabels('FATAL', 0)).toBe(Color.BG_SAKURA_600);
});
it('should return non-red colors for unrecognized labels at any index', () => {
for (let i = 0; i < 30; i++) {
const color = getColorsForSeverityLabels('4', i);
expect(isRedLike(color)).toBe(false);
}
});
it('should return non-red colors for numeric severity text', () => {
const numericLabels = ['1', '2', '4', '9', '13', '17', '21'];
numericLabels.forEach((label) => {
const color = getColorsForSeverityLabels(label, 0);
expect(isRedLike(color)).toBe(false);
});
});
});

View File

@@ -1,7 +1,16 @@
import { Color } from '@signozhq/design-tokens';
import { themeColors } from 'constants/theme';
import { colors } from 'lib/getRandomColor';
// Function to determine if a color is "red-like" based on its RGB values
export function isRedLike(hex: string): boolean {
const r = parseInt(hex.slice(1, 3), 16);
const g = parseInt(hex.slice(3, 5), 16);
const b = parseInt(hex.slice(5, 7), 16);
return r > 180 && r > g * 1.4 && r > b * 1.4;
}
const SAFE_FALLBACK_COLORS = colors.filter((c) => !isRedLike(c));
const SEVERITY_VARIANT_COLORS: Record<string, string> = {
TRACE: Color.BG_FOREST_600,
Trace: Color.BG_FOREST_500,
@@ -67,8 +76,13 @@ export function getColorsForSeverityLabels(
label: string,
index: number,
): string {
// Check if we have a direct mapping for this severity variant
const variantColor = SEVERITY_VARIANT_COLORS[label.trim()];
const trimmed = label.trim();
if (!trimmed) {
return Color.BG_SLATE_300;
}
const variantColor = SEVERITY_VARIANT_COLORS[trimmed];
if (variantColor) {
return variantColor;
}
@@ -103,5 +117,8 @@ export function getColorsForSeverityLabels(
return Color.BG_SAKURA_500;
}
return colors[index % colors.length] || themeColors.red;
return (
SAFE_FALLBACK_COLORS[index % SAFE_FALLBACK_COLORS.length] ||
Color.BG_SLATE_400
);
}

View File

@@ -111,23 +111,19 @@ const InfinityTable = forwardRef<TableVirtuosoHandle, InfinityTableProps>(
);
const itemContent = useCallback(
(index: number, log: Record<string, unknown>): JSX.Element => {
return (
<div key={log.id as string}>
<TableRow
tableColumns={tableColumns}
index={index}
log={log}
logs={tableViewProps.logs}
hasActions
fontSize={tableViewProps.fontSize}
onShowLogDetails={onSetActiveLog}
isActiveLog={activeLog?.id === log.id}
onClearActiveLog={onCloseActiveLog}
/>
</div>
);
},
(index: number, log: Record<string, unknown>): JSX.Element => (
<TableRow
tableColumns={tableColumns}
index={index}
log={log}
logs={tableViewProps.logs}
hasActions
fontSize={tableViewProps.fontSize}
onShowLogDetails={onSetActiveLog}
isActiveLog={activeLog?.id === log.id}
onClearActiveLog={onCloseActiveLog}
/>
),
[
tableColumns,
onSetActiveLog,
@@ -143,7 +139,8 @@ const InfinityTable = forwardRef<TableVirtuosoHandle, InfinityTableProps>(
{tableColumns
.filter((column) => column.key)
.map((column) => {
const isDragColumn = column.key !== 'expand';
const isDragColumn =
column.key !== 'expand' && column.key !== 'state-indicator';
return (
<TableHeaderCellStyled

View File

@@ -0,0 +1,94 @@
import { act, renderHook } from '@testing-library/react';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useIsTextSelected } from 'hooks/useIsTextSelected';
import { ILog } from 'types/api/logs/log';
import useLogDetailHandlers from '../useLogDetailHandlers';
jest.mock('hooks/logs/useActiveLog');
jest.mock('hooks/useIsTextSelected');
const mockOnSetActiveLog = jest.fn();
const mockOnClearActiveLog = jest.fn();
const mockOnAddToQuery = jest.fn();
const mockOnGroupByAttribute = jest.fn();
const mockIsTextSelected = jest.fn();
const mockLog: ILog = {
id: 'log-1',
timestamp: '2024-01-01T00:00:00Z',
date: '2024-01-01',
body: 'test log body',
severityText: 'INFO',
severityNumber: 9,
traceFlags: 0,
traceId: '',
spanID: '',
attributesString: {},
attributesInt: {},
attributesFloat: {},
resources_string: {},
scope_string: {},
attributes_string: {},
severity_text: '',
severity_number: 0,
};
beforeEach(() => {
jest.clearAllMocks();
jest.mocked(useIsTextSelected).mockReturnValue(mockIsTextSelected);
jest.mocked(useActiveLog).mockReturnValue({
activeLog: null,
onSetActiveLog: mockOnSetActiveLog,
onClearActiveLog: mockOnClearActiveLog,
onAddToQuery: mockOnAddToQuery,
onGroupByAttribute: mockOnGroupByAttribute,
});
});
it('should not open log detail when text is selected', () => {
mockIsTextSelected.mockReturnValue(true);
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnSetActiveLog).not.toHaveBeenCalled();
});
it('should open log detail when no text is selected', () => {
mockIsTextSelected.mockReturnValue(false);
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnSetActiveLog).toHaveBeenCalledWith(mockLog);
});
it('should toggle off when clicking the same active log', () => {
mockIsTextSelected.mockReturnValue(false);
jest.mocked(useActiveLog).mockReturnValue({
activeLog: mockLog,
onSetActiveLog: mockOnSetActiveLog,
onClearActiveLog: mockOnClearActiveLog,
onAddToQuery: mockOnAddToQuery,
onGroupByAttribute: mockOnGroupByAttribute,
});
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnClearActiveLog).toHaveBeenCalled();
expect(mockOnSetActiveLog).not.toHaveBeenCalled();
});

View File

@@ -2,6 +2,7 @@ import { useCallback, useState } from 'react';
import { VIEW_TYPES } from 'components/LogDetail/constants';
import type { UseActiveLog } from 'hooks/logs/types';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useIsTextSelected } from 'hooks/useIsTextSelected';
import { ILog } from 'types/api/logs/log';
type SelectedTab = typeof VIEW_TYPES[keyof typeof VIEW_TYPES] | undefined;
@@ -28,9 +29,13 @@ function useLogDetailHandlers({
onAddToQuery,
} = useActiveLog();
const [selectedTab, setSelectedTab] = useState<SelectedTab>(defaultTab);
const isTextSelected = useIsTextSelected();
const handleSetActiveLog = useCallback(
(log: ILog, nextTab: SelectedTab = defaultTab): void => {
if (isTextSelected()) {
return;
}
if (activeLog?.id === log.id) {
onClearActiveLog();
setSelectedTab(undefined);
@@ -39,7 +44,7 @@ function useLogDetailHandlers({
onSetActiveLog(log);
setSelectedTab(nextTab ?? defaultTab);
},
[activeLog?.id, defaultTab, onClearActiveLog, onSetActiveLog],
[activeLog?.id, defaultTab, onClearActiveLog, onSetActiveLog, isTextSelected],
);
const handleCloseLogDetail = useCallback((): void => {

View File

@@ -0,0 +1,10 @@
import { useCallback } from 'react';
export function useIsTextSelected(): () => boolean {
return useCallback((): boolean => {
const selection = window.getSelection();
return (
!!selection && !selection.isCollapsed && selection.toString().length > 0
);
}, []);
}

View File

@@ -1,12 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/gorilla/mux"
)
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
return nil
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -49,7 +48,6 @@ type provider struct {
authzHandler authz.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
logspipelineHandler logspipeline.Handler
}
func NewFactory(
@@ -71,7 +69,6 @@ func NewFactory(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -96,7 +93,6 @@ func NewFactory(
authzHandler,
zeusHandler,
querierHandler,
logspipelineHandler,
)
})
}
@@ -123,7 +119,6 @@ func newProvider(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -148,7 +143,6 @@ func newProvider(
authzHandler: authzHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
logspipelineHandler: logspipelineHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -229,14 +223,9 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addLogspipelineRoutes(router); err != nil {
return err
}
return nil
}
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
return []handler.OpenAPISecurityScheme{
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},

View File

@@ -1,91 +0,0 @@
package impllogspipeline
import (
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module logspipeline.Module
}
func NewHandler(module logspipeline.Module) logspipeline.Handler {
return &handler{module: module}
}
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
}
if version != -1 {
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
return
}
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
}
func ParseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}

View File

@@ -1,322 +0,0 @@
package impllogspipeline
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type module struct {
sqlstore sqlstore.SQLStore
}
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
return &module{sqlstore: sqlstore}
}
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
latestVersion := -1
// get latest agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
}
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
var stored []pipelinetypes.StoreablePipeline
err := m.sqlstore.BunDB().NewSelect().
Model(&stored).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID.StringValue()).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
return nil, err
}
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
if len(stored) == 0 {
return pipelines, nil
}
for i := range stored {
pipelines[i].StoreablePipeline = stored[i]
if err := pipelines[i].ParseRawConfig(); err != nil {
return nil, err
}
if err := pipelines[i].ParseFilter(); err != nil {
return nil, err
}
}
return pipelines, nil
}
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
return nil, nil
}
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
// regenerate the id and set other fields
storeable.Identifiable.ID = valuer.GenerateUUID()
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
CreatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
CreatedBy: claims.Email,
}
_, err = m.sqlstore.BunDB().NewInsert().
Model(&storeable).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
if err := pipeline.IsValid(); err != nil {
return nil, err
}
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
UpdatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
UpdatedBy: claims.Email,
}
// get id from storeable pipeline
id := storeable.ID.StringValue()
// depending on the order_id update the rest of the table
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// ^ |
// |_________|
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// | ^
// |_____|
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Column("order_id", "enabled").
Model(&existing).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
id,
orgID.StringValue(),
)
}
oldOrderID := existing.OrderID
newOrderID := storeable.OrderID
// Reorder other pipelines if the order has changed.
if newOrderID != oldOrderID {
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
return err
}
}
// Preserve primary key and immutable fields.
storeable.ID = existing.ID
// Persist the updated pipeline (including its new order).
if _, err := db.NewUpdate().
Model(storeable).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
// Apply pipelines if the enabled state has changed
if existing.Enabled != storeable.Enabled {
if err := m.applyPipelinesInTx(ctx, orgID, claims, db); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) applyPipelinesInTx(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, tx bun.IDB) error {
// Get ids pipelines for the given org
var pipelines []pipelinetypes.StoreablePipeline
if err := tx.NewSelect().
Column("id").
Model(&pipelines).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"no pipelines found for org %s",
orgID.StringValue(),
)
}
// prepare config elements
elements := make([]string, len(pipelines))
for i, p := range pipelines {
elements[i] = p.ID.StringValue()
}
cfg, err := agentConf.StartNewVersion(ctx, orgID, valuer.MustNewUUID(claims.UserID), opamptypes.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil {
return errors.WithAdditionalf(err, "failed to start new version for org %s", orgID.StringValue())
}
return nil
}
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
// The logic is:
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
switch {
case newOrderID < oldOrderID:
// Move up: shift affected pipelines down (order_id + 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id + 1").
Where("org_id = ?", orgID).
Where("order_id >= ?", newOrderID).
Where("order_id < ?", oldOrderID).
Exec(ctx)
return err
case newOrderID > oldOrderID:
// Move down: shift affected pipelines up (order_id - 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID).
Where("order_id > ?", oldOrderID).
Where("order_id <= ?", newOrderID).
Exec(ctx)
return err
default:
return nil
}
}
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) error {
if err := pipeline.IsValid(); err != nil {
return err
}
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
// Fetch existing pipeline to determine its current order_id.
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Model(&existing).
Column("order_id", "enabled").
Where("id = ?", pipeline.ID).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
pipeline.ID,
orgID.StringValue(),
)
}
if _, err := db.NewDelete().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Where("id = ?", pipeline.ID).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
// Set order_ids of other pipelines by collapsing the gap left by the deleted pipeline.
if _, err := db.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID.StringValue()).
Where("order_id > ?", existing.OrderID).
Exec(ctx); err != nil {
return err
}
// Apply pipelines if the deleted pipeline was enabled
if existing.Enabled {
if err := m.applyPipelinesInTx(ctx, orgID, claims, db); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
return nil
}

View File

@@ -1,27 +0,0 @@
package logspipeline
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
DeletePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) error
}
type Handler interface {
ListPipelines(w http.ResponseWriter, r *http.Request)
GetPipeline(w http.ResponseWriter, r *http.Request)
CreatePipeline(w http.ResponseWriter, r *http.Request)
UpdatePipeline(w http.ResponseWriter, r *http.Request)
DeletePipeline(w http.ResponseWriter, r *http.Request)
}

View File

@@ -120,6 +120,8 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
}
}
resultData.Rows = filteredRows
case *qbtypes.ScalarData:
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
}
filteredData = append(filteredData, result)
@@ -145,6 +147,39 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
return true
}
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
// Find column indices for server address fields
serverColIndices := make([]int, 0)
for i, col := range columns {
if col.Name == derivedKeyHTTPHost {
serverColIndices = append(serverColIndices, i)
}
}
if len(serverColIndices) == 0 {
return data
}
filtered := make([][]any, 0, len(data))
for _, row := range data {
includeRow := true
for _, colIdx := range serverColIndices {
if colIdx < len(row) {
if strVal, ok := row[colIdx].(string); ok {
if net.ParseIP(strVal) != nil {
includeRow = false
break
}
}
}
}
if includeRow {
filtered = append(filtered, row)
}
}
return filtered
}
func shouldIncludeRow(row *qbtypes.RawRow) bool {
if row.Data != nil {
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {

View File

@@ -117,6 +117,59 @@ func TestFilterResponse(t *testing.T) {
},
},
},
{
name: "should filter out IP addresses from scalar data",
input: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"192.168.1.1", 10},
{"example.com", 20},
{"10.0.0.1", 5},
},
},
},
},
},
},
expected: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"example.com", 20},
},
},
},
},
},
},
},
}
for _, tt := range tests {

View File

@@ -22,7 +22,8 @@ type RootConfig struct {
}
type OrgConfig struct {
Name string `mapstructure:"name"`
ID valuer.UUID `mapstructure:"id"`
Name string `mapstructure:"name"`
}
type PasswordConfig struct {

View File

@@ -78,6 +78,48 @@ func (s *service) Stop(ctx context.Context) error {
}
func (s *service) reconcile(ctx context.Context) error {
if !s.config.Org.ID.IsZero() {
return s.reconcileWithOrgID(ctx)
}
return s.reconcileByName(ctx)
}
func (s *service) reconcileWithOrgID(ctx context.Context) error {
org, err := s.orgGetter.Get(ctx, s.config.Org.ID)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
return err // something really went wrong
}
// org was not found using id
// check if we can find an org using name
existingOrgByName, nameErr := s.orgGetter.GetByName(ctx, s.config.Org.Name)
if nameErr != nil && !errors.Ast(nameErr, errors.TypeNotFound) {
return nameErr // something really went wrong
}
// we found an org using name
if existingOrgByName != nil {
// the existing org has the same name as config but org id is different
// inform user with actionable message
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"organization with name %q already exists with a different ID %s (expected %s)",
s.config.Org.Name, existingOrgByName.ID.StringValue(), s.config.Org.ID.StringValue(),
)
}
// default - we did not found any org using id and name both - create a new org
newOrg := types.NewOrganizationWithID(s.config.Org.ID, s.config.Org.Name, s.config.Org.Name)
_, err = s.module.CreateFirstUser(ctx, newOrg, s.config.Email.String(), s.config.Email, s.config.Password)
return err
}
return s.reconcileRootUser(ctx, org.ID)
}
func (s *service) reconcileByName(ctx context.Context) error {
org, err := s.orgGetter.GetByName(ctx, s.config.Org.Name)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -4049,6 +4048,26 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
}
func parseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{}
@@ -4079,7 +4098,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
version, err := impllogspipeline.ParseAgentConfigVersion(r)
version, err := parseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return

View File

@@ -13,8 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -68,7 +66,6 @@ type Modules struct {
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
Promote promote.Module
LogsPipeline logspipeline.Module
}
func NewModules(
@@ -113,6 +110,5 @@ func NewModules(
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
LogsPipeline: impllogspipeline.NewModule(sqlstore),
}
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -60,7 +59,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ authz.Handler }{},
struct{ zeus.Handler }{},
struct{ querier.Handler }{},
struct{ logspipeline.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
@@ -170,6 +169,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
)
}
@@ -255,7 +255,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.AuthzHandler,
handlers.ZeusHandler,
handlers.QuerierHandler,
impllogspipeline.NewHandler(modules.LogsPipeline),
),
)
}

View File

@@ -0,0 +1,209 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type migrateRulesV4ToV5 struct {
store sqlstore.SQLStore
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
func NewMigrateRulesV4ToV5Factory(
store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("migrate_rules_post_deprecation"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &migrateRulesV4ToV5{
store: store,
telemetryStore: telemetryStore,
logger: ps.Logger,
}, nil
})
}
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT name
FROM (
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
INTERSECT
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
)
ORDER BY name
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT tagKey
FROM signoz_traces.distributed_span_attributes_keys
WHERE tagType IN ('tag', 'resource')
GROUP BY tagKey
HAVING COUNT(DISTINCT tagType) > 1
ORDER BY tagKey
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
logsKeys, err := migration.getLogDuplicateKeys(ctx)
if err != nil {
return err
}
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var rules []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err = tx.NewSelect().
Table("rule").
Column("id", "data").
Scan(ctx, &rules)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
count := 0
for _, rule := range rules {
version, _ := rule.Data["version"].(string)
if version == "v5" {
continue
}
if version == "" {
migration.logger.WarnContext(ctx, "unexpected empty version for rule", "rule_id", rule.ID)
}
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
// Check if the queries envelope already exists and is non-empty
hasQueriesEnvelope := false
if condition, ok := rule.Data["condition"].(map[string]any); ok {
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
hasQueriesEnvelope = true
}
}
}
if hasQueriesEnvelope {
// already has queries envelope, just bump version
// this is because user made a mistake of choosing version
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
rule.Data["version"] = "v5"
} else {
// old format, run full migration
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
updated := alertsMigrator.Migrate(ctx, rule.Data)
if !updated {
migration.logger.WarnContext(ctx, "expected updated to be true but got false", "rule_id", rule.ID)
continue
}
rule.Data["version"] = "v5"
}
dataJSON, err := json.Marshal(rule.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("rule").
Set("data = ?", string(dataJSON)).
Where("id = ?", rule.ID).
Exec(ctx)
if err != nil {
return err
}
count++
}
if count != 0 {
migration.logger.InfoContext(ctx, "migrate v4 alerts", "count", count)
}
return tx.Commit()
}
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -41,6 +41,21 @@ func NewOrganization(displayName string, name string) *Organization {
}
}
func NewOrganizationWithID(id valuer.UUID, displayName string, name string) *Organization {
return &Organization{
Identifiable: Identifiable{
ID: id,
},
TimeAuditable: TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Name: name,
DisplayName: displayName,
Key: NewOrganizationKey(id),
}
}
func NewOrganizationKey(orgID valuer.UUID) uint32 {
hasher := fnv.New32a()

View File

@@ -10,7 +10,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -267,37 +266,6 @@ func (p *PostablePipeline) IsValid() error {
return nil
}
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
rawConfig, err := json.Marshal(p.Config)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
}
filter, err := json.Marshal(p.Filter)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
}
identifier := valuer.GenerateUUID()
if p.ID != "" {
identifier, err = valuer.NewUUID(p.ID)
if err != nil {
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
}
}
return &StoreablePipeline{
Identifiable: types.Identifiable{
ID: identifier,
},
OrderID: p.OrderID,
Enabled: p.Enabled,
Name: p.Name,
Alias: p.Alias,
Description: p.Description,
FilterString: string(filter),
ConfigJSON: string(rawConfig),
}, nil
}
func isValidOperator(op PipelineOperator) error {
if op.ID == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")

View File

@@ -355,6 +355,10 @@ func (r *PostableRule) validate() error {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
}
if r.Version != "v5" {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
}
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
}

View File

@@ -108,6 +108,7 @@ func TestParseIntoRule(t *testing.T) {
"ruleType": "threshold_rule",
"evalWindow": "5m",
"frequency": "1m",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -150,6 +151,7 @@ func TestParseIntoRule(t *testing.T) {
content: []byte(`{
"alert": "DefaultsRule",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -187,6 +189,7 @@ func TestParseIntoRule(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "PromQLRule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "promql",
@@ -256,6 +259,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "SeverityLabelTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -344,6 +348,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "NoLabelsTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -384,6 +389,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "OverwriteTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -474,6 +480,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "V2Test",
"schemaVersion": "v2",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -517,6 +524,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "DefaultSchemaTest",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -569,6 +577,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
content := []byte(`{
"alert": "TestThresholds",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -639,6 +648,7 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
"schemaVersion": "v2",
"alert": "MultiThresholdAlert",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -732,6 +742,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -766,6 +777,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -799,6 +811,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -833,6 +846,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -866,6 +880,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -901,6 +916,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -935,6 +951,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyOutOfBoundsTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -969,6 +986,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -1003,6 +1021,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",