Compare commits

..

7 Commits

Author SHA1 Message Date
Jatinderjit Singh
e247acabd4 refactor: pass MaintenanceMuter directly to pipelineBuilder
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 16:38:16 +05:30
Jatinderjit Singh
f8ecc2f305 chore: replace SPDX tag with full Apache 2.0 license boilerplate
The full license text is unambiguously compliant with Apache 2.0 Section 4(a),
which requires giving recipients "a copy of this License".

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:43:10 +05:30
Jatinderjit Singh
c84dc69be8 chore: add license header to pipeline_builder.go
Copied code originates from Apache-2.0 licensed Prometheus Alertmanager;
add dual copyright + SPDX identifier following the repo's convention.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:43:10 +05:30
Jatinderjit Singh
211436cf54 refactor: move maintenance mute stage into custom pipelineBuilder
Copy notify.PipelineBuilder locally so we can inject mms between the
silence stage and the receiver stage (GossipSettle → Inhibit →
TimeActive → TimeMute → Silence → mms → Receiver), matching the
correct suppression order the team requires.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:43:10 +05:30
Jatinderjit Singh
aaeec725b3 refactor: wrap routing pipeline once instead of per-route injection
Replace the per-route-entry loop with a single MultiStage wrap so
maintenance suppression runs once per dispatch group before routing.
2026-04-27 11:43:10 +05:30
Jatinderjit Singh
fabc716709 add maintenanceMuteStage to move planned maintenance to alertmanager
Rules previously skipped rule.Eval() entirely during maintenance windows.
This change moves suppression to MaintenanceMuter, injected as a Stage
in the alertmanager notification pipeline. Now rules always evaluate and
everys suppression is handled by alertmanager.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:43:10 +05:30
Jatinderjit Singh
bd3d75f92a fix: maintenance ignores recurrence when fixed times also set 2026-04-27 11:43:10 +05:30
44 changed files with 658 additions and 527 deletions

View File

@@ -2933,8 +2933,8 @@ components:
type: object
PromotetypesWrappedIndex:
properties:
fieldDataType:
$ref: '#/components/schemas/TelemetrytypesFieldDataType'
column_type:
type: string
granularity:
type: integer
type:

View File

@@ -5,7 +5,12 @@ import (
"fmt"
"net/http"
"github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/ee/query-service/model"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type DayWiseBreakdown struct {
@@ -65,18 +70,53 @@ func (ah *APIHandler) getBilling(w http.ResponseWriter, r *http.Request) {
return
}
data, err := ah.Signoz.Zeus.GetMeters(r.Context(), licenseKey)
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
var billing billingData
if err := json.Unmarshal(data, &billing); err != nil {
orgID := valuer.MustNewUUID(claims.OrgID)
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
useZeus := ah.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureGetMetersFromZeus, evalCtx)
if useZeus {
data, err := ah.Signoz.Zeus.GetMeters(r.Context(), licenseKey)
if err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
var billing billingData
if err := json.Unmarshal(data, &billing); err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
ah.Respond(w, billing)
return
}
billingURL := fmt.Sprintf("%s/usage?licenseKey=%s", constants.LicenseSignozIo, licenseKey)
hClient := &http.Client{}
req, err := http.NewRequest("GET", billingURL, nil)
if err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
req.Header.Add("X-SigNoz-SecretKey", constants.LicenseAPIKey)
billingResp, err := hClient.Do(req)
if err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
ah.Respond(w, billing)
return
var billingResponse billingDetails
if err := json.NewDecoder(billingResp.Body).Decode(&billingResponse); err != nil {
RespondError(w, model.InternalError(err), nil)
return
}
ah.Respond(w, billingResponse.Data)
}

View File

@@ -49,7 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, tr)
// create ch rule task for evaluation
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
@@ -73,7 +73,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, pr)
// create promql rule task for evaluation
task = newTask(baserules.TaskTypeProm, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
task = newTask(baserules.TaskTypeProm, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
// create anomaly rule
@@ -96,7 +96,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, ar)
// create anomaly rule task for evaluation
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
} else {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
@@ -210,9 +210,9 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
}
// newTask returns an appropriate group for the rule type
func newTask(taskType baserules.TaskType, name string, frequency time.Duration, rules []baserules.Rule, opts *baserules.ManagerOptions, notify baserules.NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) baserules.Task {
func newTask(taskType baserules.TaskType, name string, frequency time.Duration, rules []baserules.Rule, opts *baserules.ManagerOptions, notify baserules.NotifyFunc, orgID valuer.UUID) baserules.Task {
if taskType == baserules.TaskTypeCh {
return baserules.NewRuleTask(name, "", frequency, rules, opts, notify, maintenanceStore, orgID)
return baserules.NewRuleTask(name, "", frequency, rules, opts, notify, orgID)
}
return baserules.NewPromRuleTask(name, "", frequency, rules, opts, notify, maintenanceStore, orgID)
return baserules.NewPromRuleTask(name, "", frequency, rules, opts, notify, orgID)
}

View File

@@ -3701,7 +3701,10 @@ export interface PromotetypesPromotePathDTO {
}
export interface PromotetypesWrappedIndexDTO {
fieldDataType?: TelemetrytypesFieldDataTypeDTO;
/**
* @type string
*/
column_type?: string;
/**
* @type integer
*/

View File

@@ -10,7 +10,6 @@ import cx from 'classnames';
import { LogType } from 'components/Logs/LogStateIndicator/LogStateIndicator';
import QuerySearch from 'components/QueryBuilderV2/QueryV2/QuerySearch/QuerySearch';
import { convertExpressionToFilters } from 'components/QueryBuilderV2/utils';
import { FeatureKeys } from 'constants/features';
import { LOCALSTORAGE } from 'constants/localStorage';
import { QueryParams } from 'constants/query';
import { initialQueriesMap, PANEL_TYPES } from 'constants/queryBuilder';
@@ -47,7 +46,6 @@ import {
TextSelect,
X,
} from 'lucide-react';
import { useAppContext } from 'providers/App/App';
import { AppState } from 'store/reducers';
import { Query, TagFilter } from 'types/api/queryBuilder/queryBuilderData';
import { DataSource, StringOperators } from 'types/common/queryBuilder';
@@ -81,10 +79,6 @@ function LogDetailInner({
const [selectedView, setSelectedView] = useState<VIEWS>(selectedTab);
const [isFilterVisible, setIsFilterVisible] = useState<boolean>(false);
const { featureFlags } = useAppContext();
const isBodyJsonQueryEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.BODY_JSON_ENABLED)
?.active || false;
const [filters, setFilters] = useState<TagFilter | null>(null);
const [isEdit, setIsEdit] = useState<boolean>(false);
@@ -214,29 +208,11 @@ function LogDetailInner({
}
};
const logBody = useMemo(() => {
if (!isBodyJsonQueryEnabled) {
return log?.body || '';
}
try {
const json = JSON.parse(log?.body || '');
if (typeof json?.message === 'string' && json.message !== '') {
return json.message;
}
return log?.body || '';
} catch (error) {
return log?.body || '';
}
}, [isBodyJsonQueryEnabled, log?.body]);
const htmlBody = useMemo(
() => ({
__html: getSanitizedLogBody(logBody || '', { shouldEscapeHtml: true }),
__html: getSanitizedLogBody(log?.body || '', { shouldEscapeHtml: true }),
}),
[logBody],
[log?.body],
);
const handleJSONCopy = (): void => {
@@ -442,7 +418,7 @@ function LogDetailInner({
<div className="log-detail-drawer__log">
<Divider type="vertical" className={cx('log-type-indicator', logType)} />
<Tooltip
title={removeEscapeCharacters(logBody)}
title={removeEscapeCharacters(log?.body)}
placement="left"
mouseLeaveDelay={0}
>

View File

@@ -9,5 +9,4 @@ export enum FeatureKeys {
ANOMALY_DETECTION = 'anomaly_detection',
ONBOARDING_V3 = 'onboarding_v3',
DOT_METRICS_ENABLED = 'dot_metrics_enabled',
BODY_JSON_ENABLED = 'body_json_enabled',
}

View File

@@ -8,19 +8,8 @@ import {
OPERATORS,
QUERY_BUILDER_FUNCTIONS,
} from 'constants/antlrQueryConstants';
import { FeatureKeys } from 'constants/features';
import { QueryParams } from 'constants/query';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useGetSearchQueryParam } from 'hooks/queryBuilder/useGetSearchQueryParam';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { ICurrentQueryData } from 'hooks/useHandleExplorerTabChange';
import { useNotifications } from 'hooks/useNotifications';
import { ExplorerViews } from 'pages/LogsExplorer/utils';
import { useAppContext } from 'providers/App/App';
import {
BaseAutocompleteData,
DataTypes,
} from 'types/api/queryBuilder/queryAutocompleteResponse';
import { TitleWrapper } from './BodyTitleRenderer.styles';
import { DROPDOWN_KEY } from './constant';
@@ -36,32 +25,17 @@ function BodyTitleRenderer({
parentIsArray = false,
nodeKey,
value,
handleChangeSelectedView,
}: BodyTitleRendererProps): JSX.Element {
const { onAddToQuery } = useActiveLog();
const { stagedQuery, updateQueriesData } = useQueryBuilder();
const { featureFlags } = useAppContext();
const [, setCopy] = useCopyToClipboard();
const { notifications } = useNotifications();
const viewName = useGetSearchQueryParam(QueryParams.viewName) || '';
const cleanedNodeKey = removeObjectFromString(nodeKey);
const isBodyJsonQueryEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.BODY_JSON_ENABLED)
?.active || false;
// Group by is supported only for body json query enabled and not for array elements
const isGroupBySupported =
isBodyJsonQueryEnabled && !cleanedNodeKey.includes('[]');
const filterHandler = (isFilterIn: boolean) => (): void => {
if (parentIsArray) {
onAddToQuery(
generateFieldKeyForArray(
cleanedNodeKey,
removeObjectFromString(nodeKey),
getDataTypes(value),
isBodyJsonQueryEnabled,
),
`${value}`,
isFilterIn
@@ -71,7 +45,7 @@ function BodyTitleRenderer({
);
} else {
onAddToQuery(
`body.${cleanedNodeKey}`,
`body.${removeObjectFromString(nodeKey)}`,
`${value}`,
isFilterIn ? OPERATORS['='] : OPERATORS['!='],
getDataTypes(value),
@@ -79,67 +53,10 @@ function BodyTitleRenderer({
}
};
const groupByHandler = useCallback((): void => {
if (!stagedQuery) {
return;
}
const groupByKey = parentIsArray
? generateFieldKeyForArray(
cleanedNodeKey,
getDataTypes(value),
isBodyJsonQueryEnabled,
)
: `body.${cleanedNodeKey}`;
const fieldDataType = getDataTypes(value);
const normalizedDataType: DataTypes | undefined = Object.values(
DataTypes,
).includes(fieldDataType as DataTypes)
? (fieldDataType as DataTypes)
: undefined;
const updatedQuery = updateQueriesData(
stagedQuery,
'queryData',
(item, index) => {
if (index === 0) {
const newGroupByItem: BaseAutocompleteData = {
key: groupByKey,
type: '',
dataType: normalizedDataType,
};
return { ...item, groupBy: [...(item.groupBy || []), newGroupByItem] };
}
return item;
},
);
const queryData: ICurrentQueryData = {
name: viewName,
id: updatedQuery.id,
query: updatedQuery,
};
handleChangeSelectedView?.(ExplorerViews.TIMESERIES, queryData);
}, [
cleanedNodeKey,
handleChangeSelectedView,
isBodyJsonQueryEnabled,
parentIsArray,
stagedQuery,
updateQueriesData,
value,
viewName,
]);
const onClickHandler: MenuProps['onClick'] = (props): void => {
const mapper = {
[DROPDOWN_KEY.FILTER_IN]: filterHandler(true),
[DROPDOWN_KEY.FILTER_OUT]: filterHandler(false),
[DROPDOWN_KEY.GROUP_BY]: groupByHandler,
};
const handler = mapper[props.key];
@@ -159,14 +76,6 @@ function BodyTitleRenderer({
key: DROPDOWN_KEY.FILTER_OUT,
label: `Filter out ${value}`,
},
...(isGroupBySupported
? [
{
key: DROPDOWN_KEY.GROUP_BY,
label: `Group by ${nodeKey}`,
},
]
: []),
],
onClick: onClickHandler,
};
@@ -175,6 +84,7 @@ function BodyTitleRenderer({
(e: React.MouseEvent): void => {
// Prevent tree node expansion/collapse
e.stopPropagation();
const cleanedKey = removeObjectFromString(nodeKey);
let copyText: string;
// Check if value is an object or array
@@ -196,8 +106,8 @@ function BodyTitleRenderer({
if (copyText) {
const notificationMessage = isObject
? `${cleanedNodeKey} object copied to clipboard`
: `${cleanedNodeKey} copied to clipboard`;
? `${cleanedKey} object copied to clipboard`
: `${cleanedKey} copied to clipboard`;
notifications.success({
message: notificationMessage,
@@ -205,7 +115,7 @@ function BodyTitleRenderer({
});
}
},
[cleanedNodeKey, parentIsArray, setCopy, value, notifications],
[nodeKey, parentIsArray, setCopy, value, notifications],
);
return (

View File

@@ -1,4 +1,3 @@
import { ChangeViewFunctionType } from 'container/ExplorerOptions/types';
import { MetricsType } from 'container/MetricsApplication/constant';
import { ILog } from 'types/api/logs/log';
@@ -7,7 +6,6 @@ export interface BodyTitleRendererProps {
nodeKey: string;
value: unknown;
parentIsArray?: boolean;
handleChangeSelectedView?: ChangeViewFunctionType;
}
export type AnyObject = { [key: string]: any };

View File

@@ -2,7 +2,6 @@ import React, { useCallback, useMemo, useState } from 'react';
import { useLocation } from 'react-router-dom';
import { Color } from '@signozhq/design-tokens';
import { Button, Popover, Spin, Tooltip, Tree } from 'antd';
import type { DataNode } from 'antd/es/tree';
import GroupByIcon from 'assets/CustomIcons/GroupByIcon';
import cx from 'classnames';
import CopyClipboardHOC from 'components/Logs/CopyClipboardHOC';
@@ -58,7 +57,7 @@ interface ITableViewActionsProps {
}
// Memoized Tree Component
const MemoizedTree = React.memo<{ treeData: DataNode[] }>(({ treeData }) => (
const MemoizedTree = React.memo<{ treeData: any[] }>(({ treeData }) => (
<Tree
defaultExpandAll
showLine
@@ -75,54 +74,50 @@ const BodyContent: React.FC<{
record: DataType;
bodyHtml: { __html: string };
textToCopy: string;
handleChangeSelectedView?: ChangeViewFunctionType;
}> = React.memo(
({ fieldData, record, bodyHtml, textToCopy, handleChangeSelectedView }) => {
const { isLoading, treeData, error } = useAsyncJSONProcessing(
fieldData.value,
record.field === 'body',
handleChangeSelectedView,
}> = React.memo(({ fieldData, record, bodyHtml, textToCopy }) => {
const { isLoading, treeData, error } = useAsyncJSONProcessing(
fieldData.value,
record.field === 'body',
);
// Show JSON tree if available, otherwise show HTML content
if (record.field === 'body' && treeData) {
return <MemoizedTree treeData={treeData} />;
}
if (record.field === 'body' && isLoading) {
return (
<div style={{ display: 'flex', alignItems: 'center', gap: '8px' }}>
<Spin size="small" />
<span style={{ color: Color.BG_SIENNA_400 }}>Processing JSON...</span>
</div>
);
}
// Show JSON tree if available, otherwise show HTML content
if (record.field === 'body' && treeData) {
return <MemoizedTree treeData={treeData} />;
}
if (record.field === 'body' && error) {
return (
<span
style={{ color: Color.BG_SIENNA_400, whiteSpace: 'pre-wrap', tabSize: 4 }}
>
Error parsing Body JSON
</span>
);
}
if (record.field === 'body' && isLoading) {
return (
<div style={{ display: 'flex', alignItems: 'center', gap: '8px' }}>
<Spin size="small" />
<span style={{ color: Color.BG_SIENNA_400 }}>Processing JSON...</span>
</div>
);
}
if (record.field === 'body' && error) {
return (
if (record.field === 'body') {
return (
<CopyClipboardHOC entityKey="body" textToCopy={textToCopy}>
<span
style={{ color: Color.BG_SIENNA_400, whiteSpace: 'pre-wrap', tabSize: 4 }}
>
Error parsing Body JSON
<span dangerouslySetInnerHTML={bodyHtml} />
</span>
);
}
</CopyClipboardHOC>
);
}
if (record.field === 'body') {
return (
<CopyClipboardHOC entityKey="body" textToCopy={textToCopy}>
<span
style={{ color: Color.BG_SIENNA_400, whiteSpace: 'pre-wrap', tabSize: 4 }}
>
<span dangerouslySetInnerHTML={bodyHtml} />
</span>
</CopyClipboardHOC>
);
}
return null;
},
);
return null;
});
BodyContent.displayName = 'BodyContent';
@@ -324,7 +319,6 @@ export default function TableViewActions(
record={record}
bodyHtml={bodyHtml}
textToCopy={textToCopy}
handleChangeSelectedView={handleChangeSelectedView}
/>
);
@@ -348,7 +342,6 @@ export default function TableViewActions(
fieldData,
bodyHtml,
textToCopy,
handleChangeSelectedView,
formatTimezoneAdjustedTimestamp,
cleanTimestamp,
]);
@@ -362,7 +355,6 @@ export default function TableViewActions(
record={record}
bodyHtml={bodyHtml}
textToCopy={textToCopy}
handleChangeSelectedView={handleChangeSelectedView}
/>
{!isListViewPanel &&
!RESTRICTED_SELECTED_FIELDS.includes(fieldFilterKey) && (

View File

@@ -1,8 +1,5 @@
import { useEffect, useRef, useState } from 'react';
import { FeatureKeys } from 'constants/features';
import { ChangeViewFunctionType } from 'container/ExplorerOptions/types';
import { isEmpty } from 'lodash-es';
import { useAppContext } from 'providers/App/App';
import { jsonToDataNodes, recursiveParseJSON } from '../utils';
@@ -12,7 +9,6 @@ const MAX_BODY_BYTES = 100 * 1024; // 100 KB
const useAsyncJSONProcessing = (
value: string,
shouldProcess: boolean,
handleChangeSelectedView?: ChangeViewFunctionType,
): {
isLoading: boolean;
treeData: any[] | null;
@@ -29,10 +25,6 @@ const useAsyncJSONProcessing = (
});
const processingRef = useRef<boolean>(false);
const { featureFlags } = useAppContext();
const isBodyJsonQueryEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.BODY_JSON_ENABLED)
?.active || false;
// eslint-disable-next-line sonarjs/cognitive-complexity
useEffect((): (() => void) => {
@@ -55,10 +47,7 @@ const useAsyncJSONProcessing = (
try {
const parsedBody = recursiveParseJSON(value);
if (!isEmpty(parsedBody)) {
const treeData = jsonToDataNodes(parsedBody, {
isBodyJsonQueryEnabled,
handleChangeSelectedView,
});
const treeData = jsonToDataNodes(parsedBody);
setJsonState({ isLoading: false, treeData, error: null });
} else {
setJsonState({ isLoading: false, treeData: null, error: null });
@@ -84,10 +73,7 @@ const useAsyncJSONProcessing = (
try {
const parsedBody = recursiveParseJSON(value);
if (!isEmpty(parsedBody)) {
const treeData = jsonToDataNodes(parsedBody, {
isBodyJsonQueryEnabled,
handleChangeSelectedView,
});
const treeData = jsonToDataNodes(parsedBody);
setJsonState({ isLoading: false, treeData, error: null });
} else {
setJsonState({ isLoading: false, treeData: null, error: null });
@@ -115,7 +101,7 @@ const useAsyncJSONProcessing = (
return (): void => {
processingRef.current = false;
};
}, [value, shouldProcess, isBodyJsonQueryEnabled, handleChangeSelectedView]);
}, [value, shouldProcess]);
return jsonState;
};

View File

@@ -1,5 +1,4 @@
export const DROPDOWN_KEY = {
FILTER_IN: 'filterIn',
FILTER_OUT: 'filterOut',
GROUP_BY: 'groupBy',
};

View File

@@ -1,6 +1,5 @@
import Convert from 'ansi-to-html';
import type { DataNode } from 'antd/es/tree';
import { ChangeViewFunctionType } from 'container/ExplorerOptions/types';
import { MetricsType } from 'container/MetricsApplication/constant';
import dompurify from 'dompurify';
import { uniqueId } from 'lodash-es';
@@ -35,32 +34,13 @@ export const recursiveParseJSON = (obj: string): Record<string, unknown> => {
}
};
type JsonToDataNodesOptions = {
parentKey?: string;
parentIsArray?: boolean;
isBodyJsonQueryEnabled?: boolean;
handleChangeSelectedView?: ChangeViewFunctionType;
};
type ComputeDataNodeOptions = {
key: string;
valueIsArray: boolean;
value: unknown;
nodeKey: string;
parentIsArray: boolean;
isBodyJsonQueryEnabled?: boolean;
handleChangeSelectedView?: ChangeViewFunctionType;
};
export const computeDataNode = ({
key,
valueIsArray,
value,
nodeKey,
parentIsArray,
isBodyJsonQueryEnabled = false,
handleChangeSelectedView,
}: ComputeDataNodeOptions): DataNode => ({
export const computeDataNode = (
key: string,
valueIsArray: boolean,
value: unknown,
nodeKey: string,
parentIsArray: boolean,
): DataNode => ({
key: uniqueId(),
title: (
<BodyTitleRenderer
@@ -68,30 +48,20 @@ export const computeDataNode = ({
nodeKey={nodeKey}
value={value}
parentIsArray={parentIsArray}
handleChangeSelectedView={handleChangeSelectedView}
/>
),
children: jsonToDataNodes(value as Record<string, unknown>, {
parentKey: valueIsArray
? `${nodeKey}${isBodyJsonQueryEnabled ? '[]' : '[*]'}`
: nodeKey,
parentIsArray: valueIsArray,
isBodyJsonQueryEnabled,
handleChangeSelectedView,
}),
children: jsonToDataNodes(
value as Record<string, unknown>,
valueIsArray ? `${nodeKey}[*]` : nodeKey,
valueIsArray,
),
});
export function jsonToDataNodes(
json: Record<string, unknown>,
options: JsonToDataNodesOptions = {},
parentKey = '',
parentIsArray = false,
): DataNode[] {
const {
parentKey = '',
parentIsArray = false,
isBodyJsonQueryEnabled = false,
handleChangeSelectedView,
} = options;
return Object.entries(json).map(([key, value]) => {
let nodeKey = parentKey || key;
if (parentIsArray) {
@@ -104,15 +74,7 @@ export function jsonToDataNodes(
if (parentIsArray) {
if (typeof value === 'object' && value !== null) {
return computeDataNode({
key,
valueIsArray,
value,
nodeKey,
parentIsArray,
isBodyJsonQueryEnabled,
handleChangeSelectedView,
});
return computeDataNode(key, valueIsArray, value, nodeKey, parentIsArray);
}
return {
@@ -123,31 +85,14 @@ export function jsonToDataNodes(
nodeKey={nodeKey}
value={value}
parentIsArray={parentIsArray}
handleChangeSelectedView={handleChangeSelectedView}
/>
),
children: jsonToDataNodes(
{},
{
parentKey: nodeKey,
parentIsArray: valueIsArray,
isBodyJsonQueryEnabled,
handleChangeSelectedView,
},
),
children: jsonToDataNodes({}, nodeKey, valueIsArray),
};
}
if (typeof value === 'object' && value !== null) {
return computeDataNode({
key,
valueIsArray,
value,
nodeKey,
parentIsArray,
isBodyJsonQueryEnabled,
handleChangeSelectedView,
});
return computeDataNode(key, valueIsArray, value, nodeKey, parentIsArray);
}
return {
key: uniqueId(),
@@ -157,7 +102,6 @@ export function jsonToDataNodes(
nodeKey={nodeKey}
value={value}
parentIsArray={parentIsArray}
handleChangeSelectedView={handleChangeSelectedView}
/>
),
};
@@ -179,7 +123,6 @@ export function flattenObject(obj: AnyObject, prefix = ''): AnyObject {
export const generateFieldKeyForArray = (
fieldKey: string,
dataType: DataTypes,
isBodyJsonQueryEnabled = false,
): string => {
let lastDotIndex = fieldKey.lastIndexOf('.');
let resultNodeKey = fieldKey;
@@ -195,16 +138,6 @@ export const generateFieldKeyForArray = (
newResultNodeKey = resultNodeKey.substring(0, lastDotIndex);
}
}
// When filtering for a value inside an array, the query builder expects the
// last array segment to be referenced without the trailing `[]`.
// Examples:
// - has(body.config.features, 'fast_checkout')
// - has(body.config.features[].items, 'pen')
// - has(body.config.features[].items[].variants, 'ballpen')
if (isBodyJsonQueryEnabled && newResultNodeKey.endsWith('[]')) {
newResultNodeKey = newResultNodeKey.slice(0, -2);
}
return `body.${newResultNodeKey}`;
};

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.144.3
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0

4
go.sum
View File

@@ -108,8 +108,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.144.3 h1:/7PPIqIpPsaWtrgnfHam2XVYP41ZlgEKLHzQO8oVxcA=
github.com/SigNoz/signoz-otel-collector v0.144.3/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4 h1:EskJkEMfuuIyArWhV8SleDV/fuKxiaEGTXrCZIFqDT4=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=

View File

@@ -0,0 +1,94 @@
package alertmanagerserver
import (
"context"
"log/slog"
"sync"
"time"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
)
// MaintenanceMuter implements types.Muter for maintenance windows.
// It suppresses alerts whose ruleId label matches an active maintenance schedule.
// Results are cached for cacheTTL to avoid a DB query on every per-alert check.
type MaintenanceMuter struct {
maintenanceStore ruletypes.MaintenanceStore
orgID string
logger *slog.Logger
mu sync.RWMutex
cached []*ruletypes.PlannedMaintenance
cacheExpiry time.Time
}
const maintenanceCacheTTL = 30 * time.Second
func NewMaintenanceMuter(store ruletypes.MaintenanceStore, orgID string, logger *slog.Logger) *MaintenanceMuter {
return &MaintenanceMuter{
maintenanceStore: store,
orgID: orgID,
logger: logger,
}
}
func (m *MaintenanceMuter) Mutes(ctx context.Context, lset model.LabelSet) bool {
ruleID := string(lset[ruletypes.AlertRuleIDLabel])
if ruleID == "" {
return false
}
now := time.Now()
for _, mw := range m.getMaintenances(ctx) {
if mw.ShouldSkip(ruleID, now) {
return true
}
}
return false
}
func (m *MaintenanceMuter) getMaintenances(ctx context.Context) []*ruletypes.PlannedMaintenance {
m.mu.RLock()
if time.Now().Before(m.cacheExpiry) {
cached := m.cached
m.mu.RUnlock()
return cached
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock.
if time.Now().Before(m.cacheExpiry) {
return m.cached
}
mws, err := m.maintenanceStore.ListPlannedMaintenance(ctx, m.orgID)
if err != nil {
m.logger.ErrorContext(ctx, "failed to list planned maintenance windows; alerts will not be suppressed", slog.String("org_id", m.orgID))
return m.cached // return stale (potentially empty) cache on error
}
m.cached = mws
m.cacheExpiry = time.Now().Add(maintenanceCacheTTL)
return m.cached
}
// maintenanceMuteStage wraps MaintenanceMuter as a notify.Stage.
// We implement the stage directly rather than using notify.NewMuteStage to avoid
// a dependency on the unexported *notify.Metrics field of PipelineBuilder.
type maintenanceMuteStage struct {
muter *MaintenanceMuter
}
func (s *maintenanceMuteStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
filtered := make([]*types.Alert, 0, len(alerts))
for _, a := range alerts {
if !s.muter.Mutes(ctx, a.Labels) {
filtered = append(filtered, a)
}
}
return ctx, filtered, nil
}

View File

@@ -0,0 +1,120 @@
// Copyright (c) 2026 SigNoz, Inc.
// Copyright 2015 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alertmanagerserver
// pipelineBuilder is a local copy of notify.PipelineBuilder that injects
// the maintenance mute stage immediately before the receiver stage.
//
// We maintain our own copy so we can control exactly where in the pipeline
// the maintenance stage runs (between the silence stage and the receiver),
// which is not possible by wrapping the output of the upstream builder.
//
// Upstream pipeline order (notify.PipelineBuilder.New, notify.go:444):
//
// GossipSettle → Inhibit → TimeActive → TimeMute → Silence → [mms] → Receiver
import (
"time"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/timeinterval"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
)
type pipelineBuilder struct {
metrics *notify.Metrics
ff featurecontrol.Flagger
muter *MaintenanceMuter
}
func newPipelineBuilder(
r prometheus.Registerer,
ff featurecontrol.Flagger,
muter *MaintenanceMuter,
) *pipelineBuilder {
return &pipelineBuilder{
metrics: notify.NewMetrics(r, ff),
ff: ff,
muter: muter,
}
}
// New returns a map of receivers to Stages, mirroring notify.PipelineBuilder.New
// but inserting a maintenanceMuteStage between the silence stage and the receiver.
func (pb *pipelineBuilder) New(
receivers map[string][]notify.Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog notify.NotificationLog,
peer notify.Peer,
) notify.RoutingStage {
rs := make(notify.RoutingStage, len(receivers))
ms := notify.NewGossipSettleStage(peer)
is := notify.NewMuteStage(inhibitor, pb.metrics)
tas := notify.NewTimeActiveStage(intervener, marker, pb.metrics)
tms := notify.NewTimeMuteStage(intervener, marker, pb.metrics)
ss := notify.NewMuteStage(silencer, pb.metrics)
var mms *maintenanceMuteStage
if pb.muter != nil {
mms = &maintenanceMuteStage{muter: pb.muter}
}
for name := range receivers {
stages := notify.MultiStage{ms, is, tas, tms, ss}
if mms != nil {
stages = append(stages, mms)
}
stages = append(stages, buildReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics))
rs[name] = stages
}
pb.metrics.InitializeFor(receivers)
return rs
}
// buildReceiverStage is a copy of notify.createReceiverStage (unexported upstream).
func buildReceiverStage(
name string,
integrations []notify.Integration,
wait func() time.Duration,
notificationLog notify.NotificationLog,
metrics *notify.Metrics,
) notify.Stage {
var fs notify.FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s notify.MultiStage
s = append(s, notify.NewWaitStage(wait))
s = append(s, notify.NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, notify.NewRetryStage(integrations[i], name, metrics))
s = append(s, notify.NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}

View File

@@ -26,14 +26,13 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
)
var (
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
snapfnoop string = "snapfnoop"
)
// This is not a real snapshot file and will never be used. We need this placeholder to ensure maintenance runs on shutdown.
// See https://github.com/prometheus/alertmanager/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
var snapfnoop string = "snapfnoop"
type Server struct {
// logger is the logger for the alertmanager
@@ -63,7 +62,7 @@ type Server struct {
silencer *silence.Silencer
silences *silence.Silences
timeIntervals map[string][]timeinterval.TimeInterval
pipelineBuilder *notify.PipelineBuilder
pipelineBuilder *pipelineBuilder
marker *alertmanagertypes.MemMarker
tmpl *template.Template
wg sync.WaitGroup
@@ -71,7 +70,16 @@ type Server struct {
notificationManager nfmanager.NotificationManager
}
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, nfManager nfmanager.NotificationManager) (*Server, error) {
func New(
ctx context.Context,
logger *slog.Logger,
registry prometheus.Registerer,
srvConfig Config,
orgID string,
stateStore alertmanagertypes.StateStore,
nfManager nfmanager.NotificationManager,
maintenanceStore ruletypes.MaintenanceStore,
) (*Server, error) {
server := &Server{
logger: logger.With(slog.String("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver")),
registry: registry,
@@ -160,7 +168,6 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
return c, server.stateStore.Set(ctx, storableSilences)
})
}()
// Start maintenance for notification logs
@@ -196,7 +203,11 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
return nil, err
}
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
var muter *MaintenanceMuter
if maintenanceStore != nil {
muter = NewMaintenanceMuter(maintenanceStore, orgID, server.logger)
}
server.pipelineBuilder = newPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{}, muter)
server.dispatcherMetrics = NewDispatcherMetrics(false, signozRegisterer)
return server, nil

View File

@@ -90,7 +90,7 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
stateStore := alertmanagertypestest.NewStateStore()
registry := prometheus.NewRegistry()
logger := slog.New(slog.DiscardHandler)
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager)
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
require.NoError(t, err)

View File

@@ -25,7 +25,7 @@ import (
func TestServerSetConfigAndStop(t *testing.T) {
notificationManager := nfmanagertest.NewMock()
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
@@ -37,7 +37,7 @@ func TestServerSetConfigAndStop(t *testing.T) {
func TestServerTestReceiverTypeWebhook(t *testing.T) {
notificationManager := nfmanagertest.NewMock()
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
@@ -85,7 +85,7 @@ func TestServerPutAlerts(t *testing.T) {
srvCfg := NewConfig()
srvCfg.Route.GroupInterval = 1 * time.Second
notificationManager := nfmanagertest.NewMock()
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
@@ -133,7 +133,7 @@ func TestServerTestAlert(t *testing.T) {
srvCfg := NewConfig()
srvCfg.Route.GroupInterval = 1 * time.Second
notificationManager := nfmanagertest.NewMock()
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
@@ -238,7 +238,7 @@ func TestServerTestAlertContinuesOnFailure(t *testing.T) {
srvCfg := NewConfig()
srvCfg.Route.GroupInterval = 1 * time.Second
notificationManager := nfmanagertest.NewMock()
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")

View File

@@ -14,6 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
)
type Service struct {
@@ -39,16 +40,18 @@ type Service struct {
serversMtx sync.RWMutex
notificationManager nfmanager.NotificationManager
maintenanceStore ruletypes.MaintenanceStore
}
func New(
ctx context.Context,
settings factory.ScopedProviderSettings,
config alertmanagerserver.Config,
stateStore alertmanagertypes.StateStore,
configStore alertmanagertypes.ConfigStore,
orgGetter organization.Getter,
nfManager nfmanager.NotificationManager,
maintenanceStore ruletypes.MaintenanceStore,
) *Service {
service := &Service{
config: config,
@@ -59,6 +62,7 @@ func New(
servers: make(map[string]*alertmanagerserver.Server),
serversMtx: sync.RWMutex{},
notificationManager: nfManager,
maintenanceStore: maintenanceStore,
}
return service
@@ -177,7 +181,10 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
return nil, err
}
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationManager)
server, err := alertmanagerserver.New(
ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID,
service.stateStore, service.notificationManager, service.maintenanceStore,
)
if err != nil {
return nil, err
}

View File

@@ -20,6 +20,7 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/authtypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -30,35 +31,49 @@ type provider struct {
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
notificationManager nfmanager.NotificationManager
maintenanceStore ruletypes.MaintenanceStore
stopC chan struct{}
}
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
func NewFactory(
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
notificationManager nfmanager.NotificationManager,
maintenanceStore ruletypes.MaintenanceStore,
) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
return New(ctx, settings, config, sqlstore, orgGetter, notificationManager)
return New(settings, config, sqlstore, orgGetter, notificationManager, maintenanceStore)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) (*provider, error) {
func New(
providerSettings factory.ProviderSettings,
config alertmanager.Config,
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
notificationManager nfmanager.NotificationManager,
maintenanceStore ruletypes.MaintenanceStore,
) (*provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
p := &provider{
service: alertmanager.New(
ctx,
settings,
config.Signoz.Config,
stateStore,
configStore,
orgGetter,
notificationManager,
maintenanceStore,
),
settings: settings,
config: config,
configStore: configStore,
stateStore: stateStore,
notificationManager: notificationManager,
maintenanceStore: maintenanceStore,
stopC: make(chan struct{}),
}

View File

@@ -3,10 +3,11 @@ package flagger
import "github.com/SigNoz/signoz/pkg/types/featuretypes"
var (
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
)
func MustNewRegistry() featuretypes.Registry {
@@ -35,6 +36,14 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureGetMetersFromZeus,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether billing details are fetched from Zeus instead of the legacy subscriptions service",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeaturePutMetersInZeus,
Kind: featuretypes.KindBoolean,

View File

@@ -32,17 +32,27 @@ func NewModule(metadataStore telemetrytypes.MetadataStore, telemetrystore teleme
}
func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetypes.PromotePath, error) {
indexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
logsIndexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
if err != nil {
return nil, err
}
// Flatten the map values (which are slices) into a single slice
indexes := slices.Concat(slices.Collect(maps.Values(logsIndexes))...)
aggr := map[string][]promotetypes.WrappedIndex{}
for _, index := range indexes {
aggr[index.Name] = append(aggr[index.Name], promotetypes.WrappedIndex{
FieldDataType: index.FieldDataType,
Type: index.IndexType,
Granularity: index.Granularity,
path, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, err
}
// clean backticks from the path
path = strings.ReplaceAll(path, "`", "")
aggr[path] = append(aggr[path], promotetypes.WrappedIndex{
ColumnType: columnType,
Type: index.Type,
Granularity: index.Granularity,
})
}
promotedPaths, err := m.listPromotedPaths(ctx)

View File

@@ -32,11 +32,10 @@ import (
)
type PrepareTaskOptions struct {
Rule *ruletypes.PostableRule
TaskName string
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Querier querier.Querier
Rule *ruletypes.PostableRule
TaskName string
RuleStore ruletypes.RuleStore
Querier querier.Querier
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
@@ -46,10 +45,9 @@ type PrepareTaskOptions struct {
}
type PrepareTestRuleOptions struct {
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Querier querier.Querier
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
Querier querier.Querier
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
@@ -167,7 +165,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
rules = append(rules, tr)
// create ch rule task for evaluation
task = newTask(TaskTypeCh, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
task = newTask(TaskTypeCh, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
@@ -191,7 +189,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
rules = append(rules, pr)
// create promql rule task for evaluation
task = newTask(TaskTypeProm, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
task = newTask(TaskTypeProm, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
} else {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
@@ -432,9 +430,8 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Querier: m.opts.Querier,
RuleStore: m.ruleStore,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
@@ -645,9 +642,8 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Querier: m.opts.Querier,
RuleStore: m.ruleStore,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
@@ -1030,9 +1026,8 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
alertCount, err := m.prepareTestRuleFunc(PrepareTestRuleOptions{
Rule: &parsedRule,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Querier: m.opts.Querier,
RuleStore: m.ruleStore,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,

View File

@@ -40,13 +40,12 @@ type PromRuleTask struct {
logger *slog.Logger
notify NotifyFunc
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
orgID valuer.UUID
}
// NewPromRuleTask holds rules that have promql condition
// and evaluates the rule at a given frequency
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) *PromRuleTask {
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) *PromRuleTask {
opts.Logger.Info("initiating a new rule group", "name", name, "frequency", frequency)
if frequency == 0 {
@@ -63,10 +62,9 @@ func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o
seriesInPreviousEval: make([]map[string]plabels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
maintenanceStore: maintenanceStore,
logger: opts.Logger,
orgID: orgID,
notify: notify,
logger: opts.Logger,
orgID: orgID,
}
}
@@ -330,30 +328,12 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
}()
g.logger.InfoContext(ctx, "promql rule task", "name", g.name, "eval_started_at", ts)
maintenance, err := g.maintenanceStore.ListPlannedMaintenance(ctx, g.orgID.StringValue())
if err != nil {
g.logger.ErrorContext(ctx, "error in processing sql query", errors.Attr(err))
}
for i, rule := range g.rules {
if rule == nil {
continue
}
shouldSkip := false
for _, m := range maintenance {
g.logger.InfoContext(ctx, "checking if rule should be skipped", slog.String("rule.id", rule.ID()), slog.Any("maintenance", m))
if m.ShouldSkip(rule.ID(), ts) {
shouldSkip = true
break
}
}
if shouldSkip {
g.logger.InfoContext(ctx, "rule should be skipped", slog.String("rule.id", rule.ID()))
continue
}
select {
case <-g.done:
return

View File

@@ -38,14 +38,13 @@ type RuleTask struct {
pause bool
notify NotifyFunc
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
orgID valuer.UUID
}
const DefaultFrequency = 1 * time.Minute
// NewRuleTask makes a new RuleTask with the given name, options, and rules.
func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) *RuleTask {
func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) *RuleTask {
if frequency == 0 {
frequency = DefaultFrequency
@@ -62,9 +61,8 @@ func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
logger: opts.Logger,
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
maintenanceStore: maintenanceStore,
orgID: orgID,
notify: notify,
orgID: orgID,
}
}
@@ -318,31 +316,11 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
g.logger.DebugContext(ctx, "rule task eval started", "name", g.name, "start_time", ts)
maintenance, err := g.maintenanceStore.ListPlannedMaintenance(ctx, g.orgID.StringValue())
if err != nil {
g.logger.ErrorContext(ctx, "error in processing sql query", errors.Attr(err))
}
for i, rule := range g.rules {
if rule == nil {
continue
}
shouldSkip := false
for _, m := range maintenance {
g.logger.InfoContext(ctx, "checking if rule should be skipped", slog.String("rule.id", rule.ID()), slog.Any("maintenance", m))
if m.ShouldSkip(rule.ID(), ts) {
shouldSkip = true
break
}
}
if shouldSkip {
g.logger.InfoContext(ctx, "rule should be skipped", slog.String("rule.id", rule.ID()))
continue
}
select {
case <-g.done:
return

View File

@@ -4,7 +4,6 @@ import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -32,9 +31,9 @@ type Task interface {
// newTask returns an appropriate group for
// rule type
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) Task {
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) Task {
if taskType == TaskTypeCh {
return NewRuleTask(name, file, frequency, rules, opts, notify, maintenanceStore, orgID)
return NewRuleTask(name, file, frequency, rules, opts, notify, orgID)
}
return NewPromRuleTask(name, file, frequency, rules, opts, notify, maintenanceStore, orgID)
return NewPromRuleTask(name, file, frequency, rules, opts, notify, orgID)
}

View File

@@ -204,7 +204,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
// Downstream query builder should handle multiple matching keys with their own metadata
// and not rely on this function to do so.
materialized := true
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := []telemetrytypes.JSONDataTypeIndex{}
fieldContextsSeen := map[telemetrytypes.FieldContext]bool{}
dataTypesSeen := map[telemetrytypes.FieldDataType]bool{}
for _, matchingKey := range matchingKeys {

View File

@@ -19,6 +19,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/sqlstore"
@@ -38,7 +39,8 @@ func TestNewHandlers(t *testing.T) {
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
notificationManager := nfmanagertest.NewMock()
require.NoError(t, err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
alertmanager, err := signozalertmanager.New(providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, maintenanceStore)
require.NoError(t, err)
tokenizer := tokenizertest.NewMockTokenizer(t)
emailing := emailingtest.New()

View File

@@ -20,6 +20,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/sqlstore"
@@ -39,7 +40,8 @@ func TestNewModules(t *testing.T) {
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
notificationManager := nfmanagertest.NewMock()
require.NoError(t, err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
alertmanager, err := signozalertmanager.New(providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, maintenanceStore)
require.NoError(t, err)
tokenizer := tokenizertest.NewMockTokenizer(t)
emailing := emailingtest.New()

View File

@@ -65,6 +65,7 @@ import (
"github.com/SigNoz/signoz/pkg/tokenizer/tokenizerstore/sqltokenizerstore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/web"
"github.com/SigNoz/signoz/pkg/web/noopweb"
@@ -221,9 +222,14 @@ func NewNotificationManagerProviderFactories(routeStore alertmanagertypes.RouteS
)
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, nfManager nfmanager.NotificationManager) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
func NewAlertmanagerProviderFactories(
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
nfManager nfmanager.NotificationManager,
maintenanceStore ruletypes.MaintenanceStore,
) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
return factory.MustNewNamedMap(
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager),
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager, maintenanceStore),
)
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlschema/sqlschematest"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
"github.com/SigNoz/signoz/pkg/statsreporter"
@@ -59,9 +60,11 @@ func TestNewProviderFactories(t *testing.T) {
})
assert.NotPanics(t, func() {
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
store := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)
orgGetter := implorganization.NewGetter(implorganization.NewStore(store), nil)
notificationManager := nfmanagertest.NewMock()
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationManager)
maintenanceStore := sqlrulestore.NewMaintenanceStore(store)
NewAlertmanagerProviderFactories(store, orgGetter, notificationManager, maintenanceStore)
})
assert.NotPanics(t, func() {

View File

@@ -34,6 +34,7 @@ import (
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler"
sqlrulestore "github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sqlmigration"
"github.com/SigNoz/signoz/pkg/sqlmigrator"
@@ -362,12 +363,14 @@ func New(
return nil, err
}
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
// Initialize alertmanager from the available alertmanager provider factories
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Alertmanager,
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager),
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager, maintenanceStore),
config.Alertmanager.Provider,
)
if err != nil {

View File

@@ -195,8 +195,8 @@ func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetryty
// the field genuinely holds the empty/zero value.
//
// Note: indexing is also skipped for Array Nested fields because they cannot be indexed.
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.TelemetryFieldKeySkipIndex) bool {
return telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType] == node.TerminalConfig.ElemType
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == node.TerminalConfig.ElemType
})
isExistsCheck := operator == qbtypes.FilterOperatorExists || operator == qbtypes.FilterOperatorNotExists
if node.TerminalConfig.ElemType.IndexSupported && indexed && !isExistsCheck {

View File

@@ -1127,12 +1127,9 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
return entry.Path == path && entry.Type == jsonType
})
if idx >= 0 {
key.Indexes = append(key.Indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: path,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: LogsV2BodyV2Column,
IndexExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
key.Indexes = append(key.Indexes, telemetrytypes.JSONDataTypeIndex{
Type: jsonType,
ColumnExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
})
}
}

View File

@@ -106,7 +106,7 @@ func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFiel
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
filteredPaths := []string{}
for _, path := range paths {
// skip array paths; since they don't have any indexes
@@ -116,22 +116,47 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
filteredPaths = append(filteredPaths, path)
}
if len(filteredPaths) == 0 {
return make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex), nil
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
}
// list indexes for the paths
indexes, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
indexesMap, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
}
// build a set of indexes
fieldPathToIndexes := make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex)
for _, index := range indexes {
fieldPathToIndexes[index.Name] = append(fieldPathToIndexes[index.Name], index)
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
for path, indexes := range indexesMap {
for _, index := range indexes {
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
}
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
if jsonDataType == telemetrytypes.String {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: telemetrytypes.String,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
} else if strings.HasPrefix(index.Type, "minmax") {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: jsonDataType,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
}
}
}
return fieldPathToIndexes, nil
return cleanIndexes, nil
}
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
@@ -148,15 +173,14 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
filterExprs := []string{}
for _, filter := range filters {
// Remove backticks from actual expr cuz paths from metadata doesn't have backticks
filterExprs = append(filterExprs, sb.ILike("replaceAll(expr, '`', '')", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
filterExprs = append(filterExprs, sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
}
sb.Where(sb.Or(filterExprs...))
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
}
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
@@ -165,7 +189,7 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
}
defer rows.Close()
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := make(map[string][]schemamigrator.Index)
for rows.Next() {
var name string
var typeFull string
@@ -174,39 +198,11 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
}
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(expr)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", expr)
}
fdt, found := telemetrytypes.MappingJSONDataTypeToFieldDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map JSON data type to field data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
baseColumn := ""
fieldName := ""
switch {
case strings.HasPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix):
baseColumn = telemetrylogs.BodyV2ColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix)
case strings.HasPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix):
baseColumn = telemetrylogs.BodyPromotedColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix)
}
fieldName = strings.ReplaceAll(fieldName, "`", "")
indexes = append(indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: fieldName,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: baseColumn,
IndexName: name,
IndexType: typeFull,
IndexExpression: expr,
Granularity: int(granularity),
indexes[name] = append(indexes[name], schemamigrator.Index{
Name: name,
Type: typeFull,
Expression: expr,
Granularity: int(granularity),
})
}

View File

@@ -36,7 +36,7 @@ func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
cluster: "test-cluster",
filters: []string{"foo", "bar"},
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?) OR LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?))",
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
expectedArgs: []any{
telemetrylogs.DBName,
telemetrylogs.LogsV2LocalTableName,

View File

@@ -10,10 +10,10 @@ import (
)
type WrappedIndex struct {
JSONDataType telemetrytypes.JSONDataType `json:"-"`
FieldDataType telemetrytypes.FieldDataType `json:"fieldDataType"`
Type string `json:"type"`
Granularity int `json:"granularity"`
JSONDataType telemetrytypes.JSONDataType `json:"-"`
ColumnType string `json:"column_type"`
Type string `json:"type"`
Granularity int `json:"granularity"`
}
type PromotePath struct {
@@ -60,12 +60,12 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index granularity must be greater than 0")
}
jsonDataType, ok := telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType]
jsonDataType, ok := telemetrytypes.MappingStringToJSONDataType[index.ColumnType]
if !ok {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.ColumnType)
}
if !jsonDataType.IndexSupported {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.ColumnType)
}
i.Indexes[idx].JSONDataType = jsonDataType

View File

@@ -11,9 +11,7 @@ import (
"github.com/uptrace/bun"
)
var (
ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
)
var ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
type MaintenanceStatus struct {
valuer.String
@@ -63,15 +61,15 @@ type StorablePlannedMaintenance struct {
}
type PlannedMaintenance struct {
ID valuer.UUID `json:"id" required:"true"`
Name string `json:"name" required:"true"`
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
RuleIDs []string `json:"alertIds"`
CreatedAt time.Time `json:"createdAt"`
CreatedBy string `json:"createdBy"`
UpdatedAt time.Time `json:"updatedAt"`
UpdatedBy string `json:"updatedBy"`
ID valuer.UUID `json:"id" required:"true"`
Name string `json:"name" required:"true"`
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
RuleIDs []string `json:"alertIds"`
CreatedAt time.Time `json:"createdAt"`
CreatedBy string `json:"createdBy"`
UpdatedAt time.Time `json:"updatedAt"`
UpdatedBy string `json:"updatedBy"`
Status MaintenanceStatus `json:"status" required:"true"`
Kind MaintenanceKind `json:"kind" required:"true"`
}
@@ -161,8 +159,11 @@ func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time) bool {
currentTime := now.In(loc)
// fixed schedule
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
// fixed schedule — only when no recurrence is configured.
// When recurrence is set, the recurring check below handles everything;
// falling through here would cause the window to match the absolute
// StartTimeEndTime range instead of the daily/weekly/monthly pattern.
if m.Schedule.Recurrence == nil && !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
startTime := m.Schedule.StartTime.In(loc)
endTime := m.Schedule.EndTime.In(loc)
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||
@@ -333,6 +334,11 @@ func (m *PlannedMaintenance) Validate() error {
}
if m.Schedule.Recurrence != nil {
// Fixed range and recurrence are mutually exclusive.
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload,
"cannot set both a fixed time range (startTime/endTime) and a recurrence; use one or the other")
}
if m.Schedule.Recurrence.RepeatType.IsZero() {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing repeat type in the payload")
}

View File

@@ -1,6 +1,7 @@
package ruletypes
import (
"strings"
"testing"
"time"
@@ -13,7 +14,6 @@ func timePtr(t time.Time) *time.Time {
}
func TestShouldSkipMaintenance(t *testing.T) {
cases := []struct {
name string
maintenance *PlannedMaintenance
@@ -499,7 +499,7 @@ func TestShouldSkipMaintenance(t *testing.T) {
},
},
},
ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 1, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -508,14 +508,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 15, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -524,14 +524,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
ts: time.Date(2024, 4, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
skip: false,
},
{
@@ -540,14 +540,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
ts: time.Date(2024, 4, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
skip: false,
},
{
@@ -556,14 +556,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 5, 6, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -572,14 +572,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC),
ts: time.Date(2024, 5, 6, 14, 0, 0, 0, time.UTC),
skip: true,
},
{
@@ -588,13 +588,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 4, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -603,13 +603,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 4, 14, 10, 0, 0, time.UTC),
skip: false,
},
{
@@ -618,13 +618,53 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 5, 4, 12, 10, 0, 0, time.UTC),
skip: true,
},
// #7548: recurring daily with Schedule.StartTime/EndTime also set.
// The recurrence should govern — not the fixed range.
{
name: "recurring-daily-with-fixed-times-outside-daily-window",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
// These fixed fields should be ignored when Recurrence is set.
StartTime: time.Date(2024, 6, 1, 10, 0, 0, 0, time.UTC),
EndTime: time.Date(2024, 6, 30, 18, 0, 0, 0, time.UTC),
Recurrence: &Recurrence{
StartTime: time.Date(2024, 6, 1, 14, 0, 0, 0, time.UTC), // daily at 14:00
Duration: valuer.MustParseTextDuration("2h"), // until 16:00
RepeatType: RepeatTypeDaily,
},
},
},
// 11:00 is inside the fixed range but outside the daily 14:00-16:00 window.
// Before the fix this returned true (bug); after fix it returns false.
ts: time.Date(2024, 6, 15, 11, 0, 0, 0, time.UTC),
skip: false,
},
{
name: "recurring-daily-with-fixed-times-inside-daily-window",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Date(2024, 6, 1, 10, 0, 0, 0, time.UTC),
EndTime: time.Date(2024, 6, 30, 18, 0, 0, 0, time.UTC),
Recurrence: &Recurrence{
StartTime: time.Date(2024, 6, 1, 14, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeDaily,
},
},
},
// 15:00 is inside the daily 14:00-16:00 window — should skip.
ts: time.Date(2024, 6, 15, 15, 0, 0, 0, time.UTC),
skip: true,
},
}
@@ -636,3 +676,27 @@ func TestShouldSkipMaintenance(t *testing.T) {
}
}
}
// #7548: Validate rejects payloads that set both fixed range and recurrence.
func TestValidate_FixedAndRecurrenceMutuallyExclusive(t *testing.T) {
m := PlannedMaintenance{
Name: "test",
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Now().Add(-time.Hour),
EndTime: time.Now().Add(time.Hour),
Recurrence: &Recurrence{
StartTime: time.Now().Add(-time.Hour),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeDaily,
},
},
}
err := m.Validate()
if err == nil {
t.Fatal("expected validation error when both fixed range and recurrence are set")
}
if !strings.Contains(err.Error(), "cannot set both") {
t.Errorf("unexpected error message: %v", err)
}
}

View File

@@ -37,9 +37,9 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
Evolutions []*EvolutionEntry `json:"-"`
}
@@ -102,7 +102,7 @@ func (f TelemetryFieldKey) String() string {
if i > 0 {
sb.WriteString("; ")
}
fmt.Fprintf(&sb, "{type=%s, indexExpr=%s}", MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(), index.IndexExpression)
fmt.Fprintf(&sb, "{type=%s, columnExpr=%s, indexExpr=%s}", index.Type.StringValue(), index.ColumnExpression, index.IndexExpression)
}
sb.WriteString("]")
}
@@ -400,14 +400,3 @@ func NewFieldValueSelectorFromPostableFieldValueParams(params PostableFieldValue
return fieldValueSelector
}
type TelemetryFieldKeySkipIndex struct {
Name string `json:"name"` // Name is TelemetryFieldKey.Name not IndexName from ClickHouse
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
BaseColumn string `json:"baseColumn"`
IndexName string `json:"indexName"`
IndexType string `json:"indexType"`
IndexExpression string `json:"indexExpression"`
Granularity int `json:"granularity"`
}

View File

@@ -1,5 +1,11 @@
package telemetrytypes
type JSONDataTypeIndex struct {
Type JSONDataType
ColumnExpression string
IndexExpression string
}
type JSONDataType struct {
str string // Store the correct case for ClickHouse
IsArray bool
@@ -26,17 +32,18 @@ var (
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
)
var MappingJSONDataTypeToFieldDataType = map[string]FieldDataType{
"String": FieldDataTypeString,
"Int64": FieldDataTypeInt64,
"Float64": FieldDataTypeFloat64,
"Bool": FieldDataTypeBool,
"Array(Nullable(String))": FieldDataTypeArrayString,
"Array(Nullable(Int64))": FieldDataTypeArrayInt64,
"Array(Nullable(Float64))": FieldDataTypeArrayFloat64,
"Array(Nullable(Bool))": FieldDataTypeArrayBool,
"Array(Dynamic)": FieldDataTypeArrayDynamic,
"Array(JSON)": FieldDataTypeArrayJSON,
var MappingStringToJSONDataType = map[string]JSONDataType{
"String": String,
"Int64": Int64,
"Float64": Float64,
"Bool": Bool,
"Dynamic": Dynamic,
"Array(Nullable(String))": ArrayString,
"Array(Nullable(Int64))": ArrayInt64,
"Array(Nullable(Float64))": ArrayFloat64,
"Array(Nullable(Bool))": ArrayBool,
"Array(Dynamic)": ArrayDynamic,
"Array(JSON)": ArrayJSON,
}
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{

View File

@@ -3,6 +3,7 @@ package telemetrytypes
import (
"context"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -34,7 +35,7 @@ type MetadataStore interface {
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
// ListPromotedPaths lists the promoted paths.
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)

View File

@@ -4,6 +4,7 @@ import (
"context"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -17,7 +18,7 @@ type MockMetadataStore struct {
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
PromotedPathsMap map[string]bool
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
LogsJSONIndexesMap map[string][]schemamigrator.Index
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
@@ -33,7 +34,7 @@ func NewMockMetadataStore() *MockMetadataStore {
TemporalityMap: make(map[string]metrictypes.Temporality),
TypeMap: make(map[string]metrictypes.Type),
PromotedPathsMap: make(map[string]bool),
LogsJSONIndexes: []telemetrytypes.TelemetryFieldKeySkipIndex{},
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
@@ -368,8 +369,8 @@ func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...strin
}
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
return m.LogsJSONIndexes, nil
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
return m.LogsJSONIndexesMap, nil
}
func (m *MockMetadataStore) updateColumnEvolutionMetadataForKeys(_ context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) map[string][]*telemetrytypes.EvolutionEntry {