mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-11 12:40:36 +01:00
Compare commits
1 Commits
postproces
...
issue_4863
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef748e9ce2 |
@@ -50,7 +50,6 @@ import {
|
||||
import { JsonView } from 'periscope/components/JsonView';
|
||||
import { useAppContext } from 'providers/App/App';
|
||||
import { AppState } from 'store/reducers';
|
||||
import { ILogBody } from 'types/api/logs/log';
|
||||
import { Query, TagFilter } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { DataSource, StringOperators } from 'types/common/queryBuilder';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
@@ -218,17 +217,20 @@ function LogDetailInner({
|
||||
|
||||
const logBody = useMemo(() => {
|
||||
if (!isBodyJsonQueryEnabled) {
|
||||
return (log?.body as string) ?? '';
|
||||
return log?.body || '';
|
||||
}
|
||||
// Feature enabled: body is always a map; message is always a string
|
||||
const bodyObj = log?.body as ILogBody;
|
||||
if (!bodyObj) {
|
||||
return '';
|
||||
|
||||
try {
|
||||
const json = JSON.parse(log?.body || '');
|
||||
|
||||
if (typeof json?.message === 'string' && json.message !== '') {
|
||||
return json.message;
|
||||
}
|
||||
|
||||
return log?.body || '';
|
||||
} catch (error) {
|
||||
return log?.body || '';
|
||||
}
|
||||
if (bodyObj.message) {
|
||||
return bodyObj.message;
|
||||
}
|
||||
return JSON.stringify(bodyObj);
|
||||
}, [isBodyJsonQueryEnabled, log?.body]);
|
||||
|
||||
const htmlBody = useMemo(
|
||||
|
||||
@@ -9,10 +9,7 @@ import { Color } from '@signozhq/design-tokens';
|
||||
import { Tooltip } from 'antd';
|
||||
import { VIEW_TYPES } from 'components/LogDetail/constants';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import {
|
||||
getBodyDisplayString,
|
||||
getSanitizedLogBody,
|
||||
} from 'container/LogDetailedView/utils';
|
||||
import { getSanitizedLogBody } from 'container/LogDetailedView/utils';
|
||||
import { useCopyLogLink } from 'hooks/logs/useCopyLogLink';
|
||||
// hooks
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
@@ -102,7 +99,7 @@ function RawLogView({
|
||||
// Check if body is selected
|
||||
const showBody = selectedFields.some((field) => field.name === 'body');
|
||||
if (showBody) {
|
||||
parts.push(`${attributesText} ${getBodyDisplayString(data.body)}`);
|
||||
parts.push(`${attributesText} ${data.body}`);
|
||||
} else {
|
||||
parts.push(attributesText);
|
||||
}
|
||||
|
||||
@@ -2,10 +2,7 @@ import type { ReactElement } from 'react';
|
||||
import { useMemo } from 'react';
|
||||
import TanStackTable from 'components/TanStackTableView';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import {
|
||||
getBodyDisplayString,
|
||||
getSanitizedLogBody,
|
||||
} from 'container/LogDetailedView/utils';
|
||||
import { getSanitizedLogBody } from 'container/LogDetailedView/utils';
|
||||
import { FontSize } from 'container/OptionsMenu/types';
|
||||
import { FlatLogData } from 'lib/logs/flatLogData';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
@@ -90,7 +87,7 @@ export function useLogsTableColumns({
|
||||
? {
|
||||
id: 'body',
|
||||
header: 'Body',
|
||||
accessorFn: (log): string => getBodyDisplayString(log.body),
|
||||
accessorFn: (log): string => log.body,
|
||||
canBeHidden: false,
|
||||
width: { default: '100%', min: 300 },
|
||||
cell: ({ value, isActive }): ReactElement => (
|
||||
|
||||
@@ -19,7 +19,6 @@ import {
|
||||
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
|
||||
import { GetMetricQueryRange } from 'lib/dashboard/getQueryResults';
|
||||
import { isArray } from 'lodash-es';
|
||||
import { getBodyDisplayString } from 'container/LogDetailedView/utils';
|
||||
import { ChevronDown, ChevronLeft, ChevronRight, Loader2 } from 'lucide-react';
|
||||
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { DataSource } from 'types/common/queryBuilder';
|
||||
@@ -174,7 +173,7 @@ export default function Events({
|
||||
(event): EventDataType => ({
|
||||
timestamp: event.timestamp,
|
||||
severity: event.data.severity_text,
|
||||
body: getBodyDisplayString(event.data.body),
|
||||
body: event.data.body,
|
||||
id: event.data.id,
|
||||
key: event.data.id,
|
||||
resources_string: event.data.resources_string,
|
||||
|
||||
@@ -14,7 +14,7 @@ import { ILog } from 'types/api/logs/log';
|
||||
|
||||
import { ActionItemProps } from './ActionItem';
|
||||
import TableView from './TableView';
|
||||
import { getBodyDisplayString, removeEscapeCharacters } from './utils';
|
||||
import { removeEscapeCharacters } from './utils';
|
||||
|
||||
import './Overview.styles.scss';
|
||||
|
||||
@@ -113,7 +113,7 @@ function Overview({
|
||||
children: (
|
||||
<div className="logs-body-content">
|
||||
<MEditor
|
||||
value={removeEscapeCharacters(getBodyDisplayString(logData.body))}
|
||||
value={removeEscapeCharacters(logData.body)}
|
||||
language="json"
|
||||
options={options}
|
||||
onChange={(): void => {}}
|
||||
|
||||
@@ -107,20 +107,10 @@ function TableView({
|
||||
isListViewPanel,
|
||||
]);
|
||||
|
||||
// When USE_JSON_BODY is enabled, body arrives as a pre-parsed object. Serialize it
|
||||
// back to a string so flattenObject keeps `body` as a single table row instead of
|
||||
// recursively expanding it into dotted sub-keys (body.message, body.foo.bar, …),
|
||||
// which would break the tree view in BodyContent that relies on record.field === 'body'.
|
||||
const flattenLogData: Record<string, string> | null = useMemo(() => {
|
||||
if (!logData) {
|
||||
return null;
|
||||
}
|
||||
const normalizedLog =
|
||||
typeof logData.body === 'object' && logData.body !== null
|
||||
? { ...logData, body: JSON.stringify(logData.body) }
|
||||
: logData;
|
||||
return flattenObject(normalizedLog);
|
||||
}, [logData]);
|
||||
const flattenLogData: Record<string, string> | null = useMemo(
|
||||
() => (logData ? flattenObject(logData) : null),
|
||||
[logData],
|
||||
);
|
||||
|
||||
const handleClick = (
|
||||
operator: string,
|
||||
|
||||
@@ -10,7 +10,7 @@ const MAX_BODY_BYTES = 100 * 1024; // 100 KB
|
||||
|
||||
// Hook for async JSON processing
|
||||
const useAsyncJSONProcessing = (
|
||||
value: string | Record<string, unknown>,
|
||||
value: string,
|
||||
shouldProcess: boolean,
|
||||
handleChangeSelectedView?: ChangeViewFunctionType,
|
||||
): {
|
||||
@@ -40,17 +40,11 @@ const useAsyncJSONProcessing = (
|
||||
return (): void => {};
|
||||
}
|
||||
|
||||
// When value is already a parsed object skip the size check and JSON parsing
|
||||
const parseBody = (): Record<string, unknown> | null => {
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
return value as Record<string, unknown>;
|
||||
}
|
||||
const byteSize = new Blob([value as string]).size;
|
||||
if (byteSize > MAX_BODY_BYTES) {
|
||||
return null;
|
||||
}
|
||||
return recursiveParseJSON(value as string);
|
||||
};
|
||||
// Avoid processing if the json is too large
|
||||
const byteSize = new Blob([value]).size;
|
||||
if (byteSize > MAX_BODY_BYTES) {
|
||||
return (): void => {};
|
||||
}
|
||||
|
||||
processingRef.current = true;
|
||||
setJsonState({ isLoading: true, treeData: null, error: null });
|
||||
@@ -59,8 +53,8 @@ const useAsyncJSONProcessing = (
|
||||
const processAsync = (): void => {
|
||||
setTimeout(() => {
|
||||
try {
|
||||
const parsedBody = parseBody();
|
||||
if (parsedBody && !isEmpty(parsedBody)) {
|
||||
const parsedBody = recursiveParseJSON(value);
|
||||
if (!isEmpty(parsedBody)) {
|
||||
const treeData = jsonToDataNodes(parsedBody, {
|
||||
isBodyJsonQueryEnabled,
|
||||
handleChangeSelectedView,
|
||||
@@ -88,8 +82,8 @@ const useAsyncJSONProcessing = (
|
||||
// eslint-disable-next-line sonarjs/no-identical-functions
|
||||
(): void => {
|
||||
try {
|
||||
const parsedBody = parseBody();
|
||||
if (parsedBody && !isEmpty(parsedBody)) {
|
||||
const parsedBody = recursiveParseJSON(value);
|
||||
if (!isEmpty(parsedBody)) {
|
||||
const treeData = jsonToDataNodes(parsedBody, {
|
||||
isBodyJsonQueryEnabled,
|
||||
handleChangeSelectedView,
|
||||
|
||||
@@ -4,11 +4,7 @@ import { ChangeViewFunctionType } from 'container/ExplorerOptions/types';
|
||||
import { MetricsType } from 'container/MetricsApplication/constant';
|
||||
import dompurify from 'dompurify';
|
||||
import { uniqueId } from 'lodash-es';
|
||||
import {
|
||||
ILog,
|
||||
ILogAggregateAttributesResources,
|
||||
ILogBody,
|
||||
} from 'types/api/logs/log';
|
||||
import { ILog, ILogAggregateAttributesResources } from 'types/api/logs/log';
|
||||
import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { FORBID_DOM_PURIFY_ATTR, FORBID_DOM_PURIFY_TAGS } from 'utils/app';
|
||||
|
||||
@@ -437,24 +433,3 @@ export const getSanitizedLogBody = (
|
||||
return '{}';
|
||||
}
|
||||
};
|
||||
|
||||
// Returns a plain string for display contexts (Monaco editor, table cells, raw log row).
|
||||
export function getBodyDisplayString(body: string | ILogBody): string {
|
||||
return typeof body === 'string' ? body : JSON.stringify(body as ILogBody);
|
||||
}
|
||||
|
||||
// Returns the primary "message" text for compact log row previews.
|
||||
export function getBodyMessage(
|
||||
body: string | ILogBody,
|
||||
isBodyJsonEnabled: boolean,
|
||||
): string {
|
||||
if (!isBodyJsonEnabled) {
|
||||
return (body as string) ?? '';
|
||||
}
|
||||
// Feature enabled: body is always a map; message is always a string
|
||||
const msg = (body as ILogBody).message;
|
||||
if (msg) {
|
||||
return msg;
|
||||
}
|
||||
return JSON.stringify(body);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ import { ExpandAltOutlined } from '@ant-design/icons';
|
||||
import LogDetail from 'components/LogDetail';
|
||||
import { VIEW_TYPES } from 'components/LogDetail/constants';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import { getBodyDisplayString } from 'container/LogDetailedView/utils';
|
||||
import { useActiveLog } from 'hooks/logs/useActiveLog';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
import { ILog } from 'types/api/logs/log';
|
||||
@@ -27,9 +26,7 @@ function LogsList({ logs }: LogsListProps): JSX.Element {
|
||||
DATE_TIME_FORMATS.UTC_MONTH_SHORT,
|
||||
)}
|
||||
</div>
|
||||
<div className="logs-preview-list-item-body">
|
||||
{getBodyDisplayString(log.body)}
|
||||
</div>
|
||||
<div className="logs-preview-list-item-body">{log.body}</div>
|
||||
<div
|
||||
className="logs-preview-list-item-expand"
|
||||
onClick={makeLogDetailsHandler(log)}
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
export interface ILogBody {
|
||||
message?: string | null;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
export interface ILog {
|
||||
date: string;
|
||||
timestamp: number | string;
|
||||
@@ -13,7 +8,7 @@ export interface ILog {
|
||||
traceFlags: number;
|
||||
severityText: string;
|
||||
severityNumber: number;
|
||||
body: string | ILogBody;
|
||||
body: string;
|
||||
resources_string: Record<string, never>;
|
||||
scope_string: Record<string, never>;
|
||||
attributesString: Record<string, never>;
|
||||
|
||||
2
go.mod
2
go.mod
@@ -11,7 +11,6 @@ require (
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/bytedance/sonic v1.14.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/coreos/go-oidc/v3 v3.17.0
|
||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||
@@ -113,6 +112,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 // indirect
|
||||
|
||||
@@ -12,10 +12,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -24,8 +22,6 @@ var (
|
||||
// written clickhouse query. The column alias indcate which value is
|
||||
// to be considered as final result (or target).
|
||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||
|
||||
CodeFailUnmarshalJSONColumn = errors.MustNewCode("fail_unmarshal_json_column")
|
||||
)
|
||||
|
||||
// consume reads every row and shapes it into the payload expected for the
|
||||
@@ -397,16 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
|
||||
// de-reference the typed pointer to any
|
||||
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
||||
// Post-process JSON columns: unmarshal bytes into map[string]any
|
||||
// Post-process JSON columns: normalize into String value
|
||||
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
|
||||
switch x := val.(type) {
|
||||
case []byte:
|
||||
var m map[string]any
|
||||
err := sonic.Unmarshal(x, &m)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailUnmarshalJSONColumn, "failed to unmarshal JSON column %s", name)
|
||||
}
|
||||
val = m
|
||||
val = string(x)
|
||||
default:
|
||||
// already a structured type (map[string]any, []any, etc.)
|
||||
}
|
||||
|
||||
@@ -12,12 +12,9 @@ import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// queryInfo holds common query properties.
|
||||
@@ -53,7 +50,7 @@ func getQueryName(spec any) string {
|
||||
return getqueryInfo(spec).Name
|
||||
}
|
||||
|
||||
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
// Convert results to typed format for processing
|
||||
typedResults := make(map[string]*qbtypes.Result)
|
||||
for name, result := range results {
|
||||
@@ -72,7 +69,6 @@ func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, res
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
if result, ok := typedResults[spec.Name]; ok {
|
||||
result = postProcessBuilderQuery(q, result, spec, req)
|
||||
result = q.postProcessLogBody(ctx, orgID, result, req)
|
||||
typedResults[spec.Name] = result
|
||||
}
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -1039,33 +1035,3 @@ func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRang
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// postProcessLogBody removes the "message" key from the body map when it is empty.
|
||||
// Only runs for raw list queries with the use_json_body feature enabled.
|
||||
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
|
||||
if req.RequestType != qbtypes.RequestTypeRaw {
|
||||
return result
|
||||
}
|
||||
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
|
||||
return result
|
||||
}
|
||||
rawData, ok := result.Value.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
return result
|
||||
}
|
||||
for _, row := range rawData.Rows {
|
||||
bodyMap, ok := row.Data["body"].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg, exists := bodyMap["message"]; exists {
|
||||
switch v := msg.(type) {
|
||||
case string:
|
||||
if v == "" {
|
||||
delete(bodyMap, "message")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -36,7 +35,6 @@ var (
|
||||
|
||||
type querier struct {
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
@@ -64,12 +62,10 @@ func New(
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
bucketCache BucketCache,
|
||||
flagger flagger.Flagger,
|
||||
) *querier {
|
||||
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
||||
return &querier{
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
@@ -688,7 +684,7 @@ func (q *querier) run(
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
@@ -45,15 +44,14 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
providerSettings,
|
||||
nil, // telemetryStore
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
@@ -118,7 +116,6 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
|
||||
@@ -186,6 +186,5 @@ func newProvider(
|
||||
meterStmtBuilder,
|
||||
traceOperatorStmtBuilder,
|
||||
bucketCache,
|
||||
flagger,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -1343,7 +1343,7 @@ func getLocalTableName(tableName string) string {
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -1434,6 +1434,10 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userID,
|
||||
UpdatedBy: userID,
|
||||
},
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
@@ -1511,7 +1515,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -1572,6 +1576,10 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userID,
|
||||
UpdatedBy: userID,
|
||||
},
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
@@ -1687,7 +1695,7 @@ func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool,
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
|
||||
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, userID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
|
||||
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
@@ -1718,7 +1726,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
ttlParams.ToColdStorageDuration = 0
|
||||
}
|
||||
|
||||
ttlResult, apiErr := r.SetTTL(ctx, orgID, ttlParams)
|
||||
ttlResult, apiErr := r.SetTTL(ctx, orgID, userID, ttlParams)
|
||||
if apiErr != nil {
|
||||
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
|
||||
}
|
||||
@@ -1847,6 +1855,10 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userID,
|
||||
UpdatedBy: userID,
|
||||
},
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: params.DefaultTTLDays,
|
||||
@@ -2185,24 +2197,24 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
|
||||
// SetTTL sets the TTL for traces or metrics or logs tables.
|
||||
// This is an async API which creates goroutines to set TTL.
|
||||
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// Keep only latest 100 transactions/requests
|
||||
r.deleteTtlTransactions(ctx, orgID, 100)
|
||||
|
||||
switch params.Type {
|
||||
case constants.TraceTTL:
|
||||
return r.setTTLTraces(ctx, orgID, params)
|
||||
return r.setTTLTraces(ctx, orgID, userID, params)
|
||||
case constants.MetricsTTL:
|
||||
return r.setTTLMetrics(ctx, orgID, params)
|
||||
return r.setTTLMetrics(ctx, orgID, userID, params)
|
||||
case constants.LogsTTL:
|
||||
return r.setTTLLogs(ctx, orgID, params)
|
||||
return r.setTTLLogs(ctx, orgID, userID, params)
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -2244,6 +2256,10 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userID,
|
||||
UpdatedBy: userID,
|
||||
},
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
|
||||
@@ -1655,7 +1655,7 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Context is not used here as TTL is long duration DB operation
|
||||
result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, ttlParams)
|
||||
result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, claims.UserID, ttlParams)
|
||||
if apiErr != nil {
|
||||
if apiErr.Typ == model.ErrorConflict {
|
||||
aH.HandleError(w, apiErr.Err, http.StatusConflict)
|
||||
@@ -1684,7 +1684,7 @@ func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
|
||||
// Context is not used here as TTL is long duration DB operation
|
||||
result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, ¶ms)
|
||||
result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, claims.UserID, ¶ms)
|
||||
if apiErr != nil {
|
||||
render.Error(w, errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error()))
|
||||
return
|
||||
|
||||
@@ -46,8 +46,8 @@ type Reader interface {
|
||||
GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error)
|
||||
|
||||
// Setter Interfaces
|
||||
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
|
||||
SetTTL(ctx context.Context, orgID string, userID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
SetTTLV2(ctx context.Context, orgID string, userID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
|
||||
|
||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
||||
|
||||
@@ -53,7 +53,6 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flagger,
|
||||
), metadataStore
|
||||
}
|
||||
|
||||
@@ -103,7 +102,6 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -148,6 +146,5 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -196,6 +196,7 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
|
||||
sqlmigration.NewAddServiceAccountManagedRoleTransactionsFactory(sqlstore),
|
||||
sqlmigration.NewUpdateTTLSettingUserAuditFactory(sqlstore, sqlschema),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
84
pkg/sqlmigration/079_update_ttl_setting_user_audit.go
Normal file
84
pkg/sqlmigration/079_update_ttl_setting_user_audit.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type updateTTLSettingUserAudit struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
sqlschema sqlschema.SQLSchema
|
||||
}
|
||||
|
||||
func NewUpdateTTLSettingUserAuditFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("update_ttl_setting_user_audit"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||
return newUpdateTTLSettingUserAudit(ctx, providerSettings, config, sqlstore, sqlschema)
|
||||
})
|
||||
}
|
||||
|
||||
func newUpdateTTLSettingUserAudit(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
|
||||
return &updateTTLSettingUserAudit{
|
||||
sqlstore: sqlstore,
|
||||
sqlschema: sqlschema,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (migration *updateTTLSettingUserAudit) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateTTLSettingUserAudit) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
table, uniqueConstraints, err := migration.sqlschema.GetTable(ctx, sqlschema.TableName("ttl_setting"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
columns := []*sqlschema.Column{
|
||||
{
|
||||
Name: sqlschema.ColumnName("created_by"),
|
||||
DataType: sqlschema.DataTypeText,
|
||||
Nullable: true,
|
||||
},
|
||||
{
|
||||
Name: sqlschema.ColumnName("updated_by"),
|
||||
DataType: sqlschema.DataTypeText,
|
||||
Nullable: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, column := range columns {
|
||||
sqls := migration.sqlschema.Operator().AddColumn(table, uniqueConstraints, column, nil)
|
||||
for _, sql := range sqls {
|
||||
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateTTLSettingUserAudit) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -9,6 +9,7 @@ type TTLSetting struct {
|
||||
bun.BaseModel `bun:"table:ttl_setting"`
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
types.UserAuditable
|
||||
TransactionID string `bun:"transaction_id,type:text,notnull"`
|
||||
TableName string `bun:"table_name,type:text,notnull"`
|
||||
TTL int `bun:"ttl,notnull,default:0"`
|
||||
|
||||
@@ -20,7 +20,7 @@ from fixtures.querier import (
|
||||
|
||||
|
||||
def _get_bodies(response: requests.Response) -> list[dict[str, Any]]:
|
||||
return [row["data"]["body"] for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]) for row in get_rows(response)]
|
||||
|
||||
|
||||
def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[str, Any]) -> None:
|
||||
@@ -1183,7 +1183,7 @@ def test_message_searches(
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
def _body_messages(response: requests.Response) -> list[str]:
|
||||
return [row["data"]["body"].get("message", "") for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]).get("message", "") for row in get_rows(response)]
|
||||
|
||||
payment_messages = {
|
||||
"Payment processed successfully",
|
||||
|
||||
Reference in New Issue
Block a user