mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-11 12:40:36 +01:00
Compare commits
5 Commits
postproces
...
nv/tags
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d12c846212 | ||
|
|
4e5bd7cf6f | ||
|
|
bd11e985e1 | ||
|
|
f0b533b198 | ||
|
|
a2b4a685ad |
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/SigNoz/signoz/cmd"
|
||||
"github.com/SigNoz/signoz/ee/auditor/fileauditor"
|
||||
"github.com/SigNoz/signoz/ee/auditor/otlphttpauditor"
|
||||
"github.com/SigNoz/signoz/ee/authn/callbackauthn/oidccallbackauthn"
|
||||
"github.com/SigNoz/signoz/ee/authn/callbackauthn/samlcallbackauthn"
|
||||
@@ -155,6 +156,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
if err := factories.Add(otlphttpauditor.NewFactory(licensing, version.Info)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := factories.Add(fileauditor.NewFactory(licensing, version.Info)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
|
||||
@@ -2599,6 +2599,54 @@ components:
|
||||
- requiredMetricsCheck
|
||||
- endTimeBeforeRetention
|
||||
type: object
|
||||
InframonitoringtypesNamespaceRecord:
|
||||
properties:
|
||||
meta:
|
||||
additionalProperties:
|
||||
type: string
|
||||
nullable: true
|
||||
type: object
|
||||
namespaceCPU:
|
||||
format: double
|
||||
type: number
|
||||
namespaceMemory:
|
||||
format: double
|
||||
type: number
|
||||
namespaceName:
|
||||
type: string
|
||||
podCountsByPhase:
|
||||
$ref: '#/components/schemas/InframonitoringtypesPodCountsByPhase'
|
||||
required:
|
||||
- namespaceName
|
||||
- namespaceCPU
|
||||
- namespaceMemory
|
||||
- podCountsByPhase
|
||||
- meta
|
||||
type: object
|
||||
InframonitoringtypesNamespaces:
|
||||
properties:
|
||||
endTimeBeforeRetention:
|
||||
type: boolean
|
||||
records:
|
||||
items:
|
||||
$ref: '#/components/schemas/InframonitoringtypesNamespaceRecord'
|
||||
nullable: true
|
||||
type: array
|
||||
requiredMetricsCheck:
|
||||
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
|
||||
total:
|
||||
type: integer
|
||||
type:
|
||||
$ref: '#/components/schemas/InframonitoringtypesResponseType'
|
||||
warning:
|
||||
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
|
||||
required:
|
||||
- type
|
||||
- records
|
||||
- total
|
||||
- requiredMetricsCheck
|
||||
- endTimeBeforeRetention
|
||||
type: object
|
||||
InframonitoringtypesNodeCondition:
|
||||
enum:
|
||||
- ready
|
||||
@@ -2802,6 +2850,32 @@ components:
|
||||
- end
|
||||
- limit
|
||||
type: object
|
||||
InframonitoringtypesPostableNamespaces:
|
||||
properties:
|
||||
end:
|
||||
format: int64
|
||||
type: integer
|
||||
filter:
|
||||
$ref: '#/components/schemas/Querybuildertypesv5Filter'
|
||||
groupBy:
|
||||
items:
|
||||
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
|
||||
nullable: true
|
||||
type: array
|
||||
limit:
|
||||
type: integer
|
||||
offset:
|
||||
type: integer
|
||||
orderBy:
|
||||
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
|
||||
start:
|
||||
format: int64
|
||||
type: integer
|
||||
required:
|
||||
- start
|
||||
- end
|
||||
- limit
|
||||
type: object
|
||||
InframonitoringtypesPostableNodes:
|
||||
properties:
|
||||
end:
|
||||
@@ -11740,6 +11814,74 @@ paths:
|
||||
summary: List Hosts for Infra Monitoring
|
||||
tags:
|
||||
- inframonitoring
|
||||
/api/v2/infra_monitoring/namespaces:
|
||||
post:
|
||||
deprecated: false
|
||||
description: 'Returns a paginated list of Kubernetes namespaces with key aggregated
|
||||
pod metrics: CPU usage and memory working set (summed across pods in the group),
|
||||
plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown
|
||||
} from each pod''s latest k8s.pod.phase value in the window). Each namespace
|
||||
includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response
|
||||
type is ''list'' for the default k8s.namespace.name grouping or ''grouped_list''
|
||||
for custom groupBy keys; in both modes every row aggregates pods in the group.
|
||||
Supports filtering via a filter expression, custom groupBy, ordering by cpu
|
||||
/ memory, and pagination via offset/limit. Also reports missing required metrics
|
||||
and whether the requested time range falls before the data retention boundary.
|
||||
Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel
|
||||
when no data is available for that field.'
|
||||
operationId: ListNamespaces
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/InframonitoringtypesPostableNamespaces'
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
$ref: '#/components/schemas/InframonitoringtypesNamespaces'
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Unauthorized
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
security:
|
||||
- api_key:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: List Namespaces for Infra Monitoring
|
||||
tags:
|
||||
- inframonitoring
|
||||
/api/v2/infra_monitoring/nodes:
|
||||
post:
|
||||
deprecated: false
|
||||
|
||||
33
ee/auditor/fileauditor/export.go
Normal file
33
ee/auditor/fileauditor/export.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package fileauditor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/audittypes"
|
||||
)
|
||||
|
||||
func (provider *provider) export(ctx context.Context, events []audittypes.AuditEvent) error {
|
||||
logs := audittypes.NewPLogsFromAuditEvents(events, "signoz", provider.build.Version(), "signoz.audit")
|
||||
|
||||
payload, err := provider.marshaler.MarshalLogs(logs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Combine the payload and trailing newline into one Write call so the line
|
||||
// is emitted in a single syscall — concurrent readers see either the full
|
||||
// line or nothing, never a torn JSON object.
|
||||
payload = append(payload, '\n')
|
||||
|
||||
provider.mu.Lock()
|
||||
defer provider.mu.Unlock()
|
||||
|
||||
if _, err := provider.file.Write(payload); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "audit export failed", errors.Attr(err), slog.Int("dropped_log_records", len(events)))
|
||||
return err
|
||||
}
|
||||
|
||||
return provider.file.Sync()
|
||||
}
|
||||
100
ee/auditor/fileauditor/provider.go
Normal file
100
ee/auditor/fileauditor/provider.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package fileauditor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/auditor/auditorserver"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/types/audittypes"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
"go.opentelemetry.io/collector/pdata/plog"
|
||||
)
|
||||
|
||||
var _ auditor.Auditor = (*provider)(nil)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
config auditor.Config
|
||||
licensing licensing.Licensing
|
||||
build version.Build
|
||||
server *auditorserver.Server
|
||||
marshaler plog.JSONMarshaler
|
||||
file *os.File
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewFactory(licensing licensing.Licensing, build version.Build) factory.ProviderFactory[auditor.Auditor, auditor.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("file"), func(ctx context.Context, providerSettings factory.ProviderSettings, config auditor.Config) (auditor.Auditor, error) {
|
||||
return newProvider(ctx, providerSettings, config, licensing, build)
|
||||
})
|
||||
}
|
||||
|
||||
func newProvider(_ context.Context, providerSettings factory.ProviderSettings, config auditor.Config, licensing licensing.Licensing, build version.Build) (auditor.Auditor, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/auditor/fileauditor")
|
||||
|
||||
file, err := os.OpenFile(config.File.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, auditor.ErrCodeAuditExportFailed, "failed to open audit file %q", config.File.Path)
|
||||
}
|
||||
|
||||
provider := &provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
licensing: licensing,
|
||||
build: build,
|
||||
marshaler: plog.JSONMarshaler{},
|
||||
file: file,
|
||||
}
|
||||
|
||||
server, err := auditorserver.New(settings,
|
||||
auditorserver.Config{
|
||||
BufferSize: config.BufferSize,
|
||||
BatchSize: config.BatchSize,
|
||||
FlushInterval: config.FlushInterval,
|
||||
},
|
||||
provider.export,
|
||||
)
|
||||
if err != nil {
|
||||
_ = file.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
provider.server = server
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
return provider.server.Start(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) Audit(ctx context.Context, event audittypes.AuditEvent) {
|
||||
if event.PrincipalAttributes.PrincipalOrgID.IsZero() {
|
||||
provider.settings.Logger().WarnContext(ctx, "audit event dropped as org_id is zero")
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := provider.licensing.GetActive(ctx, event.PrincipalAttributes.PrincipalOrgID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
provider.server.Add(ctx, event)
|
||||
}
|
||||
|
||||
func (provider *provider) Healthy() <-chan struct{} {
|
||||
return provider.server.Healthy()
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
serverErr := provider.server.Stop(ctx)
|
||||
fileErr := provider.file.Close()
|
||||
if serverErr != nil {
|
||||
return serverErr
|
||||
}
|
||||
|
||||
return fileErr
|
||||
}
|
||||
@@ -13,9 +13,11 @@ import type {
|
||||
|
||||
import type {
|
||||
InframonitoringtypesPostableHostsDTO,
|
||||
InframonitoringtypesPostableNamespacesDTO,
|
||||
InframonitoringtypesPostableNodesDTO,
|
||||
InframonitoringtypesPostablePodsDTO,
|
||||
ListHosts200,
|
||||
ListNamespaces200,
|
||||
ListNodes200,
|
||||
ListPods200,
|
||||
RenderErrorResponseDTO,
|
||||
@@ -108,6 +110,90 @@ export const useListHosts = <
|
||||
|
||||
return useMutation(mutationOptions);
|
||||
};
|
||||
/**
|
||||
* Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.
|
||||
* @summary List Namespaces for Infra Monitoring
|
||||
*/
|
||||
export const listNamespaces = (
|
||||
inframonitoringtypesPostableNamespacesDTO: BodyType<InframonitoringtypesPostableNamespacesDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<ListNamespaces200>({
|
||||
url: `/api/v2/infra_monitoring/namespaces`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: inframonitoringtypesPostableNamespacesDTO,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getListNamespacesMutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof listNamespaces>>,
|
||||
TError,
|
||||
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof listNamespaces>>,
|
||||
TError,
|
||||
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['listNamespaces'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof listNamespaces>>,
|
||||
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> }
|
||||
> = (props) => {
|
||||
const { data } = props ?? {};
|
||||
|
||||
return listNamespaces(data);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type ListNamespacesMutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof listNamespaces>>
|
||||
>;
|
||||
export type ListNamespacesMutationBody =
|
||||
BodyType<InframonitoringtypesPostableNamespacesDTO>;
|
||||
export type ListNamespacesMutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary List Namespaces for Infra Monitoring
|
||||
*/
|
||||
export const useListNamespaces = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof listNamespaces>>,
|
||||
TError,
|
||||
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof listNamespaces>>,
|
||||
TError,
|
||||
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
|
||||
TContext
|
||||
> => {
|
||||
const mutationOptions = getListNamespacesMutationOptions(options);
|
||||
|
||||
return useMutation(mutationOptions);
|
||||
};
|
||||
/**
|
||||
* Returns a paginated list of Kubernetes nodes with key metrics: CPU usage, CPU allocatable, memory working set, memory allocatable, per-group nodeCountsByReadiness ({ ready, notReady } from each node's latest k8s.node.condition_ready in the window) and per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } for pods scheduled on the listed nodes). Each node includes metadata attributes (k8s.node.uid, k8s.cluster.name). The response type is 'list' for the default k8s.node.name grouping (each row is one node with its current condition string: ready / not_ready / no_data) or 'grouped_list' for custom groupBy keys (each row aggregates nodes in the group; condition stays no_data). Supports filtering via a filter expression, custom groupBy, ordering by cpu / cpu_allocatable / memory / memory_allocatable, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (nodeCPU, nodeCPUAllocatable, nodeMemory, nodeMemoryAllocatable) return -1 as a sentinel when no data is available for that field.
|
||||
* @summary List Nodes for Infra Monitoring
|
||||
|
||||
@@ -4653,6 +4653,55 @@ export interface InframonitoringtypesHostsDTO {
|
||||
warning?: Querybuildertypesv5QueryWarnDataDTO;
|
||||
}
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type InframonitoringtypesNamespaceRecordDTOMeta = {
|
||||
[key: string]: string;
|
||||
} | null;
|
||||
|
||||
export interface InframonitoringtypesNamespaceRecordDTO {
|
||||
/**
|
||||
* @type object
|
||||
* @nullable true
|
||||
*/
|
||||
meta: InframonitoringtypesNamespaceRecordDTOMeta;
|
||||
/**
|
||||
* @type number
|
||||
* @format double
|
||||
*/
|
||||
namespaceCPU: number;
|
||||
/**
|
||||
* @type number
|
||||
* @format double
|
||||
*/
|
||||
namespaceMemory: number;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
namespaceName: string;
|
||||
podCountsByPhase: InframonitoringtypesPodCountsByPhaseDTO;
|
||||
}
|
||||
|
||||
export interface InframonitoringtypesNamespacesDTO {
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
endTimeBeforeRetention: boolean;
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
records: InframonitoringtypesNamespaceRecordDTO[] | null;
|
||||
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
|
||||
/**
|
||||
* @type integer
|
||||
*/
|
||||
total: number;
|
||||
type: InframonitoringtypesResponseTypeDTO;
|
||||
warning?: Querybuildertypesv5QueryWarnDataDTO;
|
||||
}
|
||||
|
||||
export enum InframonitoringtypesNodeConditionDTO {
|
||||
ready = 'ready',
|
||||
not_ready = 'not_ready',
|
||||
@@ -4864,6 +4913,34 @@ export interface InframonitoringtypesPostableHostsDTO {
|
||||
start: number;
|
||||
}
|
||||
|
||||
export interface InframonitoringtypesPostableNamespacesDTO {
|
||||
/**
|
||||
* @type integer
|
||||
* @format int64
|
||||
*/
|
||||
end: number;
|
||||
filter?: Querybuildertypesv5FilterDTO;
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
|
||||
/**
|
||||
* @type integer
|
||||
*/
|
||||
limit: number;
|
||||
/**
|
||||
* @type integer
|
||||
*/
|
||||
offset?: number;
|
||||
orderBy?: Querybuildertypesv5OrderByDTO;
|
||||
/**
|
||||
* @type integer
|
||||
* @format int64
|
||||
*/
|
||||
start: number;
|
||||
}
|
||||
|
||||
export interface InframonitoringtypesPostableNodesDTO {
|
||||
/**
|
||||
* @type integer
|
||||
@@ -9255,6 +9332,14 @@ export type ListHosts200 = {
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type ListNamespaces200 = {
|
||||
data: InframonitoringtypesNamespacesDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type ListNodes200 = {
|
||||
data: InframonitoringtypesNodesDTO;
|
||||
/**
|
||||
|
||||
@@ -50,7 +50,6 @@ import {
|
||||
import { JsonView } from 'periscope/components/JsonView';
|
||||
import { useAppContext } from 'providers/App/App';
|
||||
import { AppState } from 'store/reducers';
|
||||
import { ILogBody } from 'types/api/logs/log';
|
||||
import { Query, TagFilter } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { DataSource, StringOperators } from 'types/common/queryBuilder';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
@@ -218,17 +217,20 @@ function LogDetailInner({
|
||||
|
||||
const logBody = useMemo(() => {
|
||||
if (!isBodyJsonQueryEnabled) {
|
||||
return (log?.body as string) ?? '';
|
||||
return log?.body || '';
|
||||
}
|
||||
// Feature enabled: body is always a map; message is always a string
|
||||
const bodyObj = log?.body as ILogBody;
|
||||
if (!bodyObj) {
|
||||
return '';
|
||||
|
||||
try {
|
||||
const json = JSON.parse(log?.body || '');
|
||||
|
||||
if (typeof json?.message === 'string' && json.message !== '') {
|
||||
return json.message;
|
||||
}
|
||||
|
||||
return log?.body || '';
|
||||
} catch (error) {
|
||||
return log?.body || '';
|
||||
}
|
||||
if (bodyObj.message) {
|
||||
return bodyObj.message;
|
||||
}
|
||||
return JSON.stringify(bodyObj);
|
||||
}, [isBodyJsonQueryEnabled, log?.body]);
|
||||
|
||||
const htmlBody = useMemo(
|
||||
|
||||
@@ -9,10 +9,7 @@ import { Color } from '@signozhq/design-tokens';
|
||||
import { Tooltip } from 'antd';
|
||||
import { VIEW_TYPES } from 'components/LogDetail/constants';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import {
|
||||
getBodyDisplayString,
|
||||
getSanitizedLogBody,
|
||||
} from 'container/LogDetailedView/utils';
|
||||
import { getSanitizedLogBody } from 'container/LogDetailedView/utils';
|
||||
import { useCopyLogLink } from 'hooks/logs/useCopyLogLink';
|
||||
// hooks
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
@@ -102,7 +99,7 @@ function RawLogView({
|
||||
// Check if body is selected
|
||||
const showBody = selectedFields.some((field) => field.name === 'body');
|
||||
if (showBody) {
|
||||
parts.push(`${attributesText} ${getBodyDisplayString(data.body)}`);
|
||||
parts.push(`${attributesText} ${data.body}`);
|
||||
} else {
|
||||
parts.push(attributesText);
|
||||
}
|
||||
|
||||
@@ -2,10 +2,7 @@ import type { ReactElement } from 'react';
|
||||
import { useMemo } from 'react';
|
||||
import TanStackTable from 'components/TanStackTableView';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import {
|
||||
getBodyDisplayString,
|
||||
getSanitizedLogBody,
|
||||
} from 'container/LogDetailedView/utils';
|
||||
import { getSanitizedLogBody } from 'container/LogDetailedView/utils';
|
||||
import { FontSize } from 'container/OptionsMenu/types';
|
||||
import { FlatLogData } from 'lib/logs/flatLogData';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
@@ -90,7 +87,7 @@ export function useLogsTableColumns({
|
||||
? {
|
||||
id: 'body',
|
||||
header: 'Body',
|
||||
accessorFn: (log): string => getBodyDisplayString(log.body),
|
||||
accessorFn: (log): string => log.body,
|
||||
canBeHidden: false,
|
||||
width: { default: '100%', min: 300 },
|
||||
cell: ({ value, isActive }): ReactElement => (
|
||||
|
||||
@@ -398,7 +398,7 @@ describe('useTableParams (selective URL mode — partial config object)', () =>
|
||||
.filter(Boolean)
|
||||
.pop();
|
||||
expect(lastExpanded).toBeDefined();
|
||||
expect(JSON.parse(lastExpanded!)).toEqual(
|
||||
expect(JSON.parse(lastExpanded!)).toStrictEqual(
|
||||
expect.arrayContaining(['row-1', 'row-2']),
|
||||
);
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import {
|
||||
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
|
||||
import { GetMetricQueryRange } from 'lib/dashboard/getQueryResults';
|
||||
import { isArray } from 'lodash-es';
|
||||
import { getBodyDisplayString } from 'container/LogDetailedView/utils';
|
||||
import { ChevronDown, ChevronLeft, ChevronRight, Loader2 } from 'lucide-react';
|
||||
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { DataSource } from 'types/common/queryBuilder';
|
||||
@@ -174,7 +173,7 @@ export default function Events({
|
||||
(event): EventDataType => ({
|
||||
timestamp: event.timestamp,
|
||||
severity: event.data.severity_text,
|
||||
body: getBodyDisplayString(event.data.body),
|
||||
body: event.data.body,
|
||||
id: event.data.id,
|
||||
key: event.data.id,
|
||||
resources_string: event.data.resources_string,
|
||||
|
||||
@@ -14,7 +14,7 @@ import { ILog } from 'types/api/logs/log';
|
||||
|
||||
import { ActionItemProps } from './ActionItem';
|
||||
import TableView from './TableView';
|
||||
import { getBodyDisplayString, removeEscapeCharacters } from './utils';
|
||||
import { removeEscapeCharacters } from './utils';
|
||||
|
||||
import './Overview.styles.scss';
|
||||
|
||||
@@ -113,7 +113,7 @@ function Overview({
|
||||
children: (
|
||||
<div className="logs-body-content">
|
||||
<MEditor
|
||||
value={removeEscapeCharacters(getBodyDisplayString(logData.body))}
|
||||
value={removeEscapeCharacters(logData.body)}
|
||||
language="json"
|
||||
options={options}
|
||||
onChange={(): void => {}}
|
||||
|
||||
@@ -107,20 +107,10 @@ function TableView({
|
||||
isListViewPanel,
|
||||
]);
|
||||
|
||||
// When USE_JSON_BODY is enabled, body arrives as a pre-parsed object. Serialize it
|
||||
// back to a string so flattenObject keeps `body` as a single table row instead of
|
||||
// recursively expanding it into dotted sub-keys (body.message, body.foo.bar, …),
|
||||
// which would break the tree view in BodyContent that relies on record.field === 'body'.
|
||||
const flattenLogData: Record<string, string> | null = useMemo(() => {
|
||||
if (!logData) {
|
||||
return null;
|
||||
}
|
||||
const normalizedLog =
|
||||
typeof logData.body === 'object' && logData.body !== null
|
||||
? { ...logData, body: JSON.stringify(logData.body) }
|
||||
: logData;
|
||||
return flattenObject(normalizedLog);
|
||||
}, [logData]);
|
||||
const flattenLogData: Record<string, string> | null = useMemo(
|
||||
() => (logData ? flattenObject(logData) : null),
|
||||
[logData],
|
||||
);
|
||||
|
||||
const handleClick = (
|
||||
operator: string,
|
||||
|
||||
@@ -10,7 +10,7 @@ const MAX_BODY_BYTES = 100 * 1024; // 100 KB
|
||||
|
||||
// Hook for async JSON processing
|
||||
const useAsyncJSONProcessing = (
|
||||
value: string | Record<string, unknown>,
|
||||
value: string,
|
||||
shouldProcess: boolean,
|
||||
handleChangeSelectedView?: ChangeViewFunctionType,
|
||||
): {
|
||||
@@ -40,17 +40,11 @@ const useAsyncJSONProcessing = (
|
||||
return (): void => {};
|
||||
}
|
||||
|
||||
// When value is already a parsed object skip the size check and JSON parsing
|
||||
const parseBody = (): Record<string, unknown> | null => {
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
return value as Record<string, unknown>;
|
||||
}
|
||||
const byteSize = new Blob([value as string]).size;
|
||||
if (byteSize > MAX_BODY_BYTES) {
|
||||
return null;
|
||||
}
|
||||
return recursiveParseJSON(value as string);
|
||||
};
|
||||
// Avoid processing if the json is too large
|
||||
const byteSize = new Blob([value]).size;
|
||||
if (byteSize > MAX_BODY_BYTES) {
|
||||
return (): void => {};
|
||||
}
|
||||
|
||||
processingRef.current = true;
|
||||
setJsonState({ isLoading: true, treeData: null, error: null });
|
||||
@@ -59,8 +53,8 @@ const useAsyncJSONProcessing = (
|
||||
const processAsync = (): void => {
|
||||
setTimeout(() => {
|
||||
try {
|
||||
const parsedBody = parseBody();
|
||||
if (parsedBody && !isEmpty(parsedBody)) {
|
||||
const parsedBody = recursiveParseJSON(value);
|
||||
if (!isEmpty(parsedBody)) {
|
||||
const treeData = jsonToDataNodes(parsedBody, {
|
||||
isBodyJsonQueryEnabled,
|
||||
handleChangeSelectedView,
|
||||
@@ -88,8 +82,8 @@ const useAsyncJSONProcessing = (
|
||||
// eslint-disable-next-line sonarjs/no-identical-functions
|
||||
(): void => {
|
||||
try {
|
||||
const parsedBody = parseBody();
|
||||
if (parsedBody && !isEmpty(parsedBody)) {
|
||||
const parsedBody = recursiveParseJSON(value);
|
||||
if (!isEmpty(parsedBody)) {
|
||||
const treeData = jsonToDataNodes(parsedBody, {
|
||||
isBodyJsonQueryEnabled,
|
||||
handleChangeSelectedView,
|
||||
|
||||
@@ -4,11 +4,7 @@ import { ChangeViewFunctionType } from 'container/ExplorerOptions/types';
|
||||
import { MetricsType } from 'container/MetricsApplication/constant';
|
||||
import dompurify from 'dompurify';
|
||||
import { uniqueId } from 'lodash-es';
|
||||
import {
|
||||
ILog,
|
||||
ILogAggregateAttributesResources,
|
||||
ILogBody,
|
||||
} from 'types/api/logs/log';
|
||||
import { ILog, ILogAggregateAttributesResources } from 'types/api/logs/log';
|
||||
import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { FORBID_DOM_PURIFY_ATTR, FORBID_DOM_PURIFY_TAGS } from 'utils/app';
|
||||
|
||||
@@ -437,24 +433,3 @@ export const getSanitizedLogBody = (
|
||||
return '{}';
|
||||
}
|
||||
};
|
||||
|
||||
// Returns a plain string for display contexts (Monaco editor, table cells, raw log row).
|
||||
export function getBodyDisplayString(body: string | ILogBody): string {
|
||||
return typeof body === 'string' ? body : JSON.stringify(body as ILogBody);
|
||||
}
|
||||
|
||||
// Returns the primary "message" text for compact log row previews.
|
||||
export function getBodyMessage(
|
||||
body: string | ILogBody,
|
||||
isBodyJsonEnabled: boolean,
|
||||
): string {
|
||||
if (!isBodyJsonEnabled) {
|
||||
return (body as string) ?? '';
|
||||
}
|
||||
// Feature enabled: body is always a map; message is always a string
|
||||
const msg = (body as ILogBody).message;
|
||||
if (msg) {
|
||||
return msg;
|
||||
}
|
||||
return JSON.stringify(body);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ import { ExpandAltOutlined } from '@ant-design/icons';
|
||||
import LogDetail from 'components/LogDetail';
|
||||
import { VIEW_TYPES } from 'components/LogDetail/constants';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import { getBodyDisplayString } from 'container/LogDetailedView/utils';
|
||||
import { useActiveLog } from 'hooks/logs/useActiveLog';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
import { ILog } from 'types/api/logs/log';
|
||||
@@ -27,9 +26,7 @@ function LogsList({ logs }: LogsListProps): JSX.Element {
|
||||
DATE_TIME_FORMATS.UTC_MONTH_SHORT,
|
||||
)}
|
||||
</div>
|
||||
<div className="logs-preview-list-item-body">
|
||||
{getBodyDisplayString(log.body)}
|
||||
</div>
|
||||
<div className="logs-preview-list-item-body">{log.body}</div>
|
||||
<div
|
||||
className="logs-preview-list-item-expand"
|
||||
onClick={makeLogDetailsHandler(log)}
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
export interface ILogBody {
|
||||
message?: string | null;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
export interface ILog {
|
||||
date: string;
|
||||
timestamp: number | string;
|
||||
@@ -13,7 +8,7 @@ export interface ILog {
|
||||
traceFlags: number;
|
||||
severityText: string;
|
||||
severityNumber: number;
|
||||
body: string | ILogBody;
|
||||
body: string;
|
||||
resources_string: Record<string, never>;
|
||||
scope_string: Record<string, never>;
|
||||
attributesString: Record<string, never>;
|
||||
|
||||
2
go.mod
2
go.mod
@@ -11,7 +11,6 @@ require (
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/bytedance/sonic v1.14.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/coreos/go-oidc/v3 v3.17.0
|
||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||
@@ -113,6 +112,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 // indirect
|
||||
|
||||
@@ -67,5 +67,24 @@ func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v2/infra_monitoring/namespaces", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.infraMonitoringHandler.ListNamespaces),
|
||||
handler.OpenAPIDef{
|
||||
ID: "ListNamespaces",
|
||||
Tags: []string{"inframonitoring"},
|
||||
Summary: "List Namespaces for Infra Monitoring",
|
||||
Description: "Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.",
|
||||
Request: new(inframonitoringtypes.PostableNamespaces),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(inframonitoringtypes.Namespaces),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
})).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ type Config struct {
|
||||
FlushInterval time.Duration `mapstructure:"flush_interval"`
|
||||
|
||||
OTLPHTTP OTLPHTTPConfig `mapstructure:"otlphttp"`
|
||||
|
||||
File FileConfig `mapstructure:"file"`
|
||||
}
|
||||
|
||||
// OTLPHTTPConfig holds configuration for the OTLP HTTP exporter provider.
|
||||
@@ -46,6 +48,12 @@ type OTLPHTTPConfig struct {
|
||||
Retry RetryConfig `mapstructure:"retry"`
|
||||
}
|
||||
|
||||
type FileConfig struct {
|
||||
// Path is the absolute path to the audit log file. The file is opened with
|
||||
// O_APPEND|O_CREATE|O_WRONLY; existing contents are preserved across runs.
|
||||
Path string `mapstructure:"path"`
|
||||
}
|
||||
|
||||
// RetryConfig configures exponential backoff for the OTLP HTTP exporter.
|
||||
type RetryConfig struct {
|
||||
// Enabled controls whether retries are attempted on transient failures.
|
||||
@@ -111,5 +119,11 @@ func (c Config) Validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if c.Provider == "file" {
|
||||
if c.File.Path == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "auditor::file::path must be set when provider is file")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -93,3 +93,27 @@ func (h *handler) ListNodes(rw http.ResponseWriter, req *http.Request) {
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
func (h *handler) ListNamespaces(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
var parsedReq inframonitoringtypes.PostableNamespaces
|
||||
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.ListNamespaces(req.Context(), orgID, &parsedReq)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
@@ -337,3 +337,92 @@ func (m *module) ListNodes(ctx context.Context, orgID valuer.UUID, req *inframon
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *module) ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error) {
|
||||
if err := req.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &inframonitoringtypes.Namespaces{}
|
||||
|
||||
if req.OrderBy == nil {
|
||||
req.OrderBy = &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: inframonitoringtypes.NamespacesOrderByCPU,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
}
|
||||
}
|
||||
|
||||
if len(req.GroupBy) == 0 {
|
||||
req.GroupBy = []qbtypes.GroupByKey{namespaceNameGroupByKey}
|
||||
resp.Type = inframonitoringtypes.ResponseTypeList
|
||||
} else {
|
||||
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
|
||||
}
|
||||
|
||||
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, namespacesTableMetricNamesList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(missingMetrics) > 0 {
|
||||
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
|
||||
resp.Records = []inframonitoringtypes.NamespaceRecord{}
|
||||
resp.Total = 0
|
||||
return resp, nil
|
||||
}
|
||||
if req.End < int64(minFirstReportedUnixMilli) {
|
||||
resp.EndTimeBeforeRetention = true
|
||||
resp.Records = []inframonitoringtypes.NamespaceRecord{}
|
||||
resp.Total = 0
|
||||
return resp, nil
|
||||
}
|
||||
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
|
||||
|
||||
metadataMap, err := m.getNamespacesTableMetadata(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Total = len(metadataMap)
|
||||
|
||||
pageGroups, err := m.getTopNamespaceGroups(ctx, orgID, req, metadataMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(pageGroups) == 0 {
|
||||
resp.Records = []inframonitoringtypes.NamespaceRecord{}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
filterExpr := ""
|
||||
if req.Filter != nil {
|
||||
filterExpr = req.Filter.Expression
|
||||
}
|
||||
|
||||
fullQueryReq := buildFullQueryRequest(req.Start, req.End, filterExpr, req.GroupBy, pageGroups, m.newNamespacesTableListQuery())
|
||||
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Records = buildNamespaceRecords(queryResp, pageGroups, req.GroupBy, metadataMap, phaseCounts)
|
||||
resp.Warning = queryResp.Warning
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
123
pkg/modules/inframonitoring/implinframonitoring/namespaces.go
Normal file
123
pkg/modules/inframonitoring/implinframonitoring/namespaces.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package implinframonitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// buildNamespaceRecords assembles the page records. Pod phase counts come from
|
||||
// phaseCounts in both modes; every row is a group of pods, so there's no
|
||||
// per-row "current phase" concept (unlike pods/nodes list mode).
|
||||
func buildNamespaceRecords(
|
||||
resp *qbtypes.QueryRangeResponse,
|
||||
pageGroups []map[string]string,
|
||||
groupBy []qbtypes.GroupByKey,
|
||||
metadataMap map[string]map[string]string,
|
||||
phaseCounts map[string]podPhaseCounts,
|
||||
) []inframonitoringtypes.NamespaceRecord {
|
||||
metricsMap := parseFullQueryResponse(resp, groupBy)
|
||||
|
||||
records := make([]inframonitoringtypes.NamespaceRecord, 0, len(pageGroups))
|
||||
for _, labels := range pageGroups {
|
||||
compositeKey := compositeKeyFromLabels(labels, groupBy)
|
||||
namespaceName := labels[namespaceNameAttrKey]
|
||||
|
||||
record := inframonitoringtypes.NamespaceRecord{ // initialize with default values
|
||||
NamespaceName: namespaceName,
|
||||
NamespaceCPU: -1,
|
||||
NamespaceMemory: -1,
|
||||
Meta: map[string]string{},
|
||||
}
|
||||
|
||||
if metrics, ok := metricsMap[compositeKey]; ok {
|
||||
if v, exists := metrics["A"]; exists {
|
||||
record.NamespaceCPU = v
|
||||
}
|
||||
if v, exists := metrics["D"]; exists {
|
||||
record.NamespaceMemory = v
|
||||
}
|
||||
}
|
||||
|
||||
if phaseCountsForGroup, ok := phaseCounts[compositeKey]; ok {
|
||||
record.PodCountsByPhase = inframonitoringtypes.PodCountsByPhase{
|
||||
Pending: phaseCountsForGroup.Pending,
|
||||
Running: phaseCountsForGroup.Running,
|
||||
Succeeded: phaseCountsForGroup.Succeeded,
|
||||
Failed: phaseCountsForGroup.Failed,
|
||||
Unknown: phaseCountsForGroup.Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
if attrs, ok := metadataMap[compositeKey]; ok {
|
||||
for k, v := range attrs {
|
||||
record.Meta[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
records = append(records, record)
|
||||
}
|
||||
return records
|
||||
}
|
||||
|
||||
func (m *module) getTopNamespaceGroups(
|
||||
ctx context.Context,
|
||||
orgID valuer.UUID,
|
||||
req *inframonitoringtypes.PostableNamespaces,
|
||||
metadataMap map[string]map[string]string,
|
||||
) ([]map[string]string, error) {
|
||||
orderByKey := req.OrderBy.Key.Name
|
||||
queryNamesForOrderBy := orderByToNamespacesQueryNames[orderByKey]
|
||||
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
|
||||
|
||||
topReq := &qbtypes.QueryRangeRequest{
|
||||
Start: uint64(req.Start),
|
||||
End: uint64(req.End),
|
||||
RequestType: qbtypes.RequestTypeScalar,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
|
||||
},
|
||||
}
|
||||
|
||||
for _, envelope := range m.newNamespacesTableListQuery().CompositeQuery.Queries {
|
||||
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
|
||||
continue
|
||||
}
|
||||
copied := envelope
|
||||
if copied.Type == qbtypes.QueryTypeBuilder {
|
||||
existingExpr := ""
|
||||
if f := copied.GetFilter(); f != nil {
|
||||
existingExpr = f.Expression
|
||||
}
|
||||
reqFilterExpr := ""
|
||||
if req.Filter != nil {
|
||||
reqFilterExpr = req.Filter.Expression
|
||||
}
|
||||
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
|
||||
copied.SetFilter(&qbtypes.Filter{Expression: merged})
|
||||
copied.SetGroupBy(req.GroupBy)
|
||||
}
|
||||
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
|
||||
}
|
||||
|
||||
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
|
||||
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
|
||||
}
|
||||
|
||||
func (m *module) getNamespacesTableMetadata(ctx context.Context, req *inframonitoringtypes.PostableNamespaces) (map[string]map[string]string, error) {
|
||||
var nonGroupByAttrs []string
|
||||
for _, key := range namespaceAttrKeysForMetadata {
|
||||
if !isKeyInGroupByAttrs(req.GroupBy, key) {
|
||||
nonGroupByAttrs = append(nonGroupByAttrs, key)
|
||||
}
|
||||
}
|
||||
return m.getMetadata(ctx, namespacesTableMetricNamesList, req.GroupBy, nonGroupByAttrs, req.Filter, req.Start, req.End)
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package implinframonitoring
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const (
|
||||
namespaceNameAttrKey = "k8s.namespace.name"
|
||||
)
|
||||
|
||||
var namespaceNameGroupByKey = qbtypes.GroupByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: namespaceNameAttrKey,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
}
|
||||
|
||||
// namespacesTableMetricNamesList drives the existence/retention check.
|
||||
// Includes k8s.pod.phase so the response short-circuits cleanly when a
|
||||
// cluster doesn't ship the metric — even though phase isn't part of the
|
||||
// QB composite query (it's queried separately via getPerGroupPodPhaseCounts).
|
||||
var namespacesTableMetricNamesList = []string{
|
||||
"k8s.pod.cpu.usage",
|
||||
"k8s.pod.memory.working_set",
|
||||
"k8s.pod.phase",
|
||||
}
|
||||
|
||||
var namespaceAttrKeysForMetadata = []string{
|
||||
"k8s.namespace.name",
|
||||
"k8s.cluster.name",
|
||||
}
|
||||
|
||||
var orderByToNamespacesQueryNames = map[string][]string{
|
||||
inframonitoringtypes.NamespacesOrderByCPU: {"A"},
|
||||
inframonitoringtypes.NamespacesOrderByMemory: {"D"},
|
||||
}
|
||||
|
||||
// newNamespacesTableListQuery builds the composite QB v5 request for the namespaces list.
|
||||
// Pod phase counts are derived separately via getPerGroupPodPhaseCounts (works for both
|
||||
// list and grouped_list modes), so no phase query is included here.
|
||||
// Query letters A and D are kept aligned with the v1 implementation.
|
||||
func (m *module) newNamespacesTableListQuery() *qbtypes.QueryRangeRequest {
|
||||
queries := []qbtypes.QueryEnvelope{
|
||||
// Query A: CPU usage — sum of pod CPU within the group.
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "k8s.pod.cpu.usage",
|
||||
TimeAggregation: metrictypes.TimeAggregationAvg,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
ReduceTo: qbtypes.ReduceToAvg,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
|
||||
Disabled: false,
|
||||
},
|
||||
},
|
||||
// Query D: Memory working set — sum of pod memory within the group.
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "D",
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "k8s.pod.memory.working_set",
|
||||
TimeAggregation: metrictypes.TimeAggregationAvg,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
ReduceTo: qbtypes.ReduceToAvg,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
|
||||
Disabled: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &qbtypes.QueryRangeRequest{
|
||||
RequestType: qbtypes.RequestTypeScalar,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: queries,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -12,10 +12,12 @@ type Handler interface {
|
||||
ListHosts(http.ResponseWriter, *http.Request)
|
||||
ListPods(http.ResponseWriter, *http.Request)
|
||||
ListNodes(http.ResponseWriter, *http.Request)
|
||||
ListNamespaces(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
type Module interface {
|
||||
ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error)
|
||||
ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error)
|
||||
ListNodes(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNodes) (*inframonitoringtypes.Nodes, error)
|
||||
ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error)
|
||||
}
|
||||
|
||||
57
pkg/modules/tag/impltag/module.go
Normal file
57
pkg/modules/tag/impltag/module.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package impltag
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/tag"
|
||||
"github.com/SigNoz/signoz/pkg/types/tagtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
store tagtypes.Store
|
||||
}
|
||||
|
||||
func NewModule(store tagtypes.Store) tag.Module {
|
||||
return &module{store: store}
|
||||
}
|
||||
|
||||
func (m *module) CreateMany(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, postable []tagtypes.PostableTag, createdBy string) ([]*tagtypes.Tag, error) {
|
||||
if len(postable) == 0 {
|
||||
return []*tagtypes.Tag{}, nil
|
||||
}
|
||||
|
||||
toCreate, matched, err := tagtypes.Resolve(ctx, m.store, orgID, entityType, postable, createdBy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
created, err := m.store.Create(ctx, toCreate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append(matched, created...), nil
|
||||
}
|
||||
|
||||
func (m *module) LinkToEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error {
|
||||
if len(tagIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return m.store.CreateRelations(ctx, tagtypes.NewTagRelations(orgID, entityType, entityID, tagIDs))
|
||||
}
|
||||
|
||||
func (m *module) SyncLinksForEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error {
|
||||
if err := m.store.CreateRelations(ctx, tagtypes.NewTagRelations(orgID, entityType, entityID, tagIDs)); err != nil {
|
||||
return err
|
||||
}
|
||||
return m.store.DeleteRelationsExcept(ctx, entityType, entityID, tagIDs)
|
||||
}
|
||||
|
||||
func (m *module) ListForEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error) {
|
||||
return m.store.ListByEntity(ctx, entityType, entityID)
|
||||
}
|
||||
|
||||
func (m *module) ListForEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error) {
|
||||
return m.store.ListByEntities(ctx, entityType, entityIDs)
|
||||
}
|
||||
132
pkg/modules/tag/impltag/store.go
Normal file
132
pkg/modules/tag/impltag/store.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package impltag
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/tagtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type store struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStore(sqlstore sqlstore.SQLStore) tagtypes.Store {
|
||||
return &store{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (s *store) List(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType) ([]*tagtypes.Tag, error) {
|
||||
tags := make([]*tagtypes.Tag, 0)
|
||||
err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewSelect().
|
||||
Model(&tags).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("entity_type = ?", entityType).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (s *store) ListByEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error) {
|
||||
tags := make([]*tagtypes.Tag, 0)
|
||||
err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewSelect().
|
||||
Model(&tags).
|
||||
Join("JOIN tag_relations AS tr ON tr.tag_id = tag.id").
|
||||
Where("tr.entity_type = ?", entityType).
|
||||
Where("tr.entity_id = ?", entityID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (s *store) ListByEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error) {
|
||||
if len(entityIDs) == 0 {
|
||||
return map[valuer.UUID][]*tagtypes.Tag{}, nil
|
||||
}
|
||||
|
||||
type joinedRow struct {
|
||||
tagtypes.Tag
|
||||
EntityID valuer.UUID `bun:"entity_id"`
|
||||
}
|
||||
|
||||
rows := make([]*joinedRow, 0)
|
||||
err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewSelect().
|
||||
Model(&rows).
|
||||
ColumnExpr("tag.*, tr.entity_id").
|
||||
Join("JOIN tag_relations AS tr ON tr.tag_id = tag.id").
|
||||
Where("tr.entity_type = ?", entityType).
|
||||
Where("tr.entity_id IN (?)", bun.In(entityIDs)).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(map[valuer.UUID][]*tagtypes.Tag)
|
||||
for _, r := range rows {
|
||||
tag := r.Tag
|
||||
out[r.EntityID] = append(out[r.EntityID], &tag)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *store) Create(ctx context.Context, tags []*tagtypes.Tag) ([]*tagtypes.Tag, error) {
|
||||
if len(tags) == 0 {
|
||||
return tags, nil
|
||||
}
|
||||
// DO UPDATE on a self-set is a deliberate no-op write whose only purpose
|
||||
// is to make RETURNING fire on conflicting rows. Without it, RETURNING is
|
||||
// silent on the conflict path and we'd have to refetch by (key, value) to
|
||||
// learn the existing rows' IDs after a concurrent-insert race. Setting
|
||||
// key = tag.key (the existing row's value) preserves the first writer's
|
||||
// casing on case-only collisions.
|
||||
err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(&tags).
|
||||
On("CONFLICT (org_id, entity_type, (LOWER(key)), (LOWER(value))) DO UPDATE").
|
||||
Set("key = tag.key").
|
||||
Returning("*").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (s *store) CreateRelations(ctx context.Context, relations []*tagtypes.TagRelation) error {
|
||||
if len(relations) == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(&relations).
|
||||
On("CONFLICT (entity_type, entity_id, tag_id) DO NOTHING").
|
||||
Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) DeleteRelationsExcept(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID, keepTagIDs []valuer.UUID) error {
|
||||
q := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewDelete().
|
||||
Model((*tagtypes.TagRelation)(nil)).
|
||||
Where("entity_type = ?", entityType).
|
||||
Where("entity_id = ?", entityID)
|
||||
if len(keepTagIDs) > 0 {
|
||||
q = q.Where("tag_id NOT IN (?)", bun.In(keepTagIDs))
|
||||
}
|
||||
_, err := q.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
146
pkg/modules/tag/impltag/store_test.go
Normal file
146
pkg/modules/tag/impltag/store_test.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package impltag
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/tagtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func newTestStore(t *testing.T) sqlstore.SQLStore {
|
||||
t.Helper()
|
||||
dbPath := filepath.Join(t.TempDir(), "test.db")
|
||||
store, err := sqlitesqlstore.New(context.Background(), factorytest.NewSettings(), sqlstore.Config{
|
||||
Provider: "sqlite",
|
||||
Connection: sqlstore.ConnectionConfig{
|
||||
MaxOpenConns: 1,
|
||||
MaxConnLifetime: 0,
|
||||
},
|
||||
Sqlite: sqlstore.SqliteConfig{
|
||||
Path: dbPath,
|
||||
Mode: "wal",
|
||||
BusyTimeout: 5 * time.Second,
|
||||
TransactionMode: "deferred",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = store.BunDB().NewCreateTable().
|
||||
Model((*tagtypes.Tag)(nil)).
|
||||
IfNotExists().
|
||||
Exec(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = store.BunDB().Exec(`CREATE UNIQUE INDEX IF NOT EXISTS uq_tag_org_entity_lower_key_lower_value ON tag (org_id, entity_type, LOWER(key), LOWER(value))`)
|
||||
require.NoError(t, err)
|
||||
return store
|
||||
}
|
||||
|
||||
var dashboardEntityType = tagtypes.MustNewEntityType("dashboard")
|
||||
|
||||
func tagsByLowerKeyValue(t *testing.T, db *bun.DB) map[string]*tagtypes.Tag {
|
||||
t.Helper()
|
||||
all := make([]*tagtypes.Tag, 0)
|
||||
require.NoError(t, db.NewSelect().Model(&all).Scan(context.Background()))
|
||||
out := map[string]*tagtypes.Tag{}
|
||||
for _, tag := range all {
|
||||
out[strings.ToLower(tag.Key)+"\x00"+strings.ToLower(tag.Value)] = tag
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func TestStore_Create_PopulatesIDsOnFreshInsert(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sqlstore := newTestStore(t)
|
||||
s := NewStore(sqlstore)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
tagA := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "u@signoz.io")
|
||||
tagB := tagtypes.NewTag(orgID, dashboardEntityType, "team", "BLR", "u@signoz.io")
|
||||
preIDA := tagA.ID
|
||||
preIDB := tagB.ID
|
||||
|
||||
got, err := s.Create(ctx, []*tagtypes.Tag{tagA, tagB})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, got, 2)
|
||||
|
||||
// No race → pre-generated IDs stand. The slice is what we passed in,
|
||||
// confirming Scan didn't reallocate.
|
||||
assert.Equal(t, preIDA, got[0].ID)
|
||||
assert.Equal(t, preIDB, got[1].ID)
|
||||
|
||||
// And the rows are in the DB.
|
||||
stored := tagsByLowerKeyValue(t, sqlstore.BunDB())
|
||||
require.Contains(t, stored, "tag\x00database")
|
||||
require.Contains(t, stored, "team\x00blr")
|
||||
assert.Equal(t, preIDA, stored["tag\x00database"].ID)
|
||||
assert.Equal(t, preIDB, stored["team\x00blr"].ID)
|
||||
}
|
||||
|
||||
func TestStore_Create_ConflictReturnsExistingRowID(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sqlstore := newTestStore(t)
|
||||
s := NewStore(sqlstore)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
|
||||
// Simulate a concurrent insert: someone else has already inserted "tag:Database".
|
||||
winner := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "concurrent")
|
||||
_, err := s.Create(ctx, []*tagtypes.Tag{winner})
|
||||
require.NoError(t, err)
|
||||
winnerID := winner.ID
|
||||
|
||||
// Now our request runs with a different pre-generated ID for the same
|
||||
// (key, value) — case differs but the functional unique index collapses
|
||||
// them. RETURNING should overwrite our stale ID with winner's ID.
|
||||
loser := tagtypes.NewTag(orgID, dashboardEntityType, "TAG", "DATABASE", "u@signoz.io")
|
||||
loserPreID := loser.ID
|
||||
require.NotEqual(t, winnerID, loserPreID, "pre-generated IDs must differ for this test to be meaningful")
|
||||
|
||||
got, err := s.Create(ctx, []*tagtypes.Tag{loser})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, got, 1)
|
||||
|
||||
assert.Equal(t, winnerID, got[0].ID, "returned slice should carry the existing row's ID, not our stale one")
|
||||
assert.Equal(t, winnerID, loser.ID, "input slice element is mutated in place")
|
||||
|
||||
// And the DB still has exactly one row for that (lower(key), lower(value)) — winner's, with winner's casing.
|
||||
stored := tagsByLowerKeyValue(t, sqlstore.BunDB())
|
||||
require.Len(t, stored, 1)
|
||||
assert.Equal(t, winnerID, stored["tag\x00database"].ID)
|
||||
assert.Equal(t, "tag", stored["tag\x00database"].Key, "winner's casing preserved in key")
|
||||
assert.Equal(t, "Database", stored["tag\x00database"].Value, "winner's casing preserved in value")
|
||||
}
|
||||
|
||||
func TestStore_Create_MixedFreshAndConflict(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sqlstore := newTestStore(t)
|
||||
s := NewStore(sqlstore)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
pre := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "concurrent")
|
||||
_, err := s.Create(ctx, []*tagtypes.Tag{pre})
|
||||
require.NoError(t, err)
|
||||
preExistingID := pre.ID
|
||||
|
||||
conflict := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "u@signoz.io")
|
||||
fresh := tagtypes.NewTag(orgID, dashboardEntityType, "team", "BLR", "u@signoz.io")
|
||||
freshPreID := fresh.ID
|
||||
|
||||
got, err := s.Create(ctx, []*tagtypes.Tag{conflict, fresh})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, got, 2)
|
||||
|
||||
assert.Equal(t, preExistingID, got[0].ID, "conflicting row's ID overwritten with the existing row's")
|
||||
assert.Equal(t, freshPreID, got[1].ID, "fresh row's pre-generated ID is preserved")
|
||||
}
|
||||
24
pkg/modules/tag/tag.go
Normal file
24
pkg/modules/tag/tag.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package tag
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/tagtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
// Does not link the resolved tags to any entity — call LinkToEntity for that.
|
||||
CreateMany(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, postable []tagtypes.PostableTag, createdBy string) ([]*tagtypes.Tag, error)
|
||||
|
||||
// Existing rows are left untouched.
|
||||
LinkToEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error
|
||||
|
||||
// missing links are inserted, obsolete ones removed.
|
||||
SyncLinksForEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error
|
||||
|
||||
ListForEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error)
|
||||
|
||||
// Entities with no tags are absent from the returned map.
|
||||
ListForEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error)
|
||||
}
|
||||
@@ -12,10 +12,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -24,8 +22,6 @@ var (
|
||||
// written clickhouse query. The column alias indcate which value is
|
||||
// to be considered as final result (or target).
|
||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||
|
||||
CodeFailUnmarshalJSONColumn = errors.MustNewCode("fail_unmarshal_json_column")
|
||||
)
|
||||
|
||||
// consume reads every row and shapes it into the payload expected for the
|
||||
@@ -397,16 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
|
||||
// de-reference the typed pointer to any
|
||||
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
||||
// Post-process JSON columns: unmarshal bytes into map[string]any
|
||||
// Post-process JSON columns: normalize into String value
|
||||
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
|
||||
switch x := val.(type) {
|
||||
case []byte:
|
||||
var m map[string]any
|
||||
err := sonic.Unmarshal(x, &m)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailUnmarshalJSONColumn, "failed to unmarshal JSON column %s", name)
|
||||
}
|
||||
val = m
|
||||
val = string(x)
|
||||
default:
|
||||
// already a structured type (map[string]any, []any, etc.)
|
||||
}
|
||||
|
||||
@@ -12,12 +12,9 @@ import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// queryInfo holds common query properties.
|
||||
@@ -53,7 +50,7 @@ func getQueryName(spec any) string {
|
||||
return getqueryInfo(spec).Name
|
||||
}
|
||||
|
||||
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
// Convert results to typed format for processing
|
||||
typedResults := make(map[string]*qbtypes.Result)
|
||||
for name, result := range results {
|
||||
@@ -72,7 +69,6 @@ func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, res
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
if result, ok := typedResults[spec.Name]; ok {
|
||||
result = postProcessBuilderQuery(q, result, spec, req)
|
||||
result = q.postProcessLogBody(ctx, orgID, result, req)
|
||||
typedResults[spec.Name] = result
|
||||
}
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -1039,33 +1035,3 @@ func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRang
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// postProcessLogBody removes the "message" key from the body map when it is empty.
|
||||
// Only runs for raw list queries with the use_json_body feature enabled.
|
||||
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
|
||||
if req.RequestType != qbtypes.RequestTypeRaw {
|
||||
return result
|
||||
}
|
||||
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
|
||||
return result
|
||||
}
|
||||
rawData, ok := result.Value.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
return result
|
||||
}
|
||||
for _, row := range rawData.Rows {
|
||||
bodyMap, ok := row.Data["body"].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg, exists := bodyMap["message"]; exists {
|
||||
switch v := msg.(type) {
|
||||
case string:
|
||||
if v == "" {
|
||||
delete(bodyMap, "message")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -36,7 +35,6 @@ var (
|
||||
|
||||
type querier struct {
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
@@ -64,12 +62,10 @@ func New(
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
bucketCache BucketCache,
|
||||
flagger flagger.Flagger,
|
||||
) *querier {
|
||||
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
||||
return &querier{
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
@@ -688,7 +684,7 @@ func (q *querier) run(
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
@@ -45,15 +44,14 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
providerSettings,
|
||||
nil, // telemetryStore
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
@@ -118,7 +116,6 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
|
||||
@@ -186,6 +186,5 @@ func newProvider(
|
||||
meterStmtBuilder,
|
||||
traceOperatorStmtBuilder,
|
||||
bucketCache,
|
||||
flagger,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -53,7 +53,6 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flagger,
|
||||
), metadataStore
|
||||
}
|
||||
|
||||
@@ -103,7 +102,6 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -148,6 +146,5 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
@@ -44,6 +45,7 @@ func TestNewHandlers(t *testing.T) {
|
||||
emailing := emailingtest.New()
|
||||
queryParser := queryparser.New(providerSettings)
|
||||
require.NoError(t, err)
|
||||
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
|
||||
dashboardModule := impldashboard.NewModule(impldashboard.NewStore(sqlstore), providerSettings, nil, orgGetter, queryParser)
|
||||
|
||||
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
@@ -52,7 +54,7 @@ func TestNewHandlers(t *testing.T) {
|
||||
userRoleStore := impluser.NewUserRoleStore(sqlstore, providerSettings)
|
||||
|
||||
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings), userRoleStore, flagger)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger, tagModule)
|
||||
|
||||
querierHandler := querier.NewHandler(providerSettings, nil, nil)
|
||||
registryHandler := factory.NewHandler(nil)
|
||||
|
||||
@@ -40,6 +40,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanpercentile"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanpercentile/implspanpercentile"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tag"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracedetail/impltracedetail"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||
@@ -80,6 +81,7 @@ type Modules struct {
|
||||
CloudIntegration cloudintegration.Module
|
||||
RuleStateHistory rulestatehistory.Module
|
||||
TraceDetail tracedetail.Module
|
||||
Tag tag.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -104,6 +106,7 @@ func NewModules(
|
||||
serviceAccount serviceaccount.Module,
|
||||
cloudIntegrationModule cloudintegration.Module,
|
||||
fl flagger.Flagger,
|
||||
tagModule tag.Module,
|
||||
) Modules {
|
||||
quickfilter := implquickfilter.NewModule(implquickfilter.NewStore(sqlstore))
|
||||
orgSetter := implorganization.NewSetter(implorganization.NewStore(sqlstore), alertmanager, quickfilter)
|
||||
@@ -133,5 +136,6 @@ func NewModules(
|
||||
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
|
||||
CloudIntegration: cloudIntegrationModule,
|
||||
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
|
||||
Tag: tagModule,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
@@ -45,6 +46,7 @@ func TestNewModules(t *testing.T) {
|
||||
emailing := emailingtest.New()
|
||||
queryParser := queryparser.New(providerSettings)
|
||||
require.NoError(t, err)
|
||||
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
|
||||
dashboardModule := impldashboard.NewModule(impldashboard.NewStore(sqlstore), providerSettings, nil, orgGetter, queryParser)
|
||||
|
||||
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
@@ -56,7 +58,7 @@ func TestNewModules(t *testing.T) {
|
||||
|
||||
serviceAccount := implserviceaccount.NewModule(implserviceaccount.NewStore(sqlstore), nil, nil, nil, providerSettings, serviceaccount.Config{})
|
||||
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger, tagModule)
|
||||
|
||||
reflectVal := reflect.ValueOf(modules)
|
||||
for i := 0; i < reflectVal.NumField(); i++ {
|
||||
|
||||
@@ -196,6 +196,7 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
|
||||
sqlmigration.NewAddServiceAccountManagedRoleTransactionsFactory(sqlstore),
|
||||
sqlmigration.NewAddTagsFactory(sqlstore, sqlschema),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
@@ -325,6 +326,11 @@ func New(
|
||||
// Initialize query parser (needed for dashboard module)
|
||||
queryParser := queryparser.New(providerSettings)
|
||||
|
||||
// Initialize tag module — shared across modules that link entities to tags
|
||||
// (currently dashboard; future: alerts, RBAC). Built once here and injected
|
||||
// where needed.
|
||||
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
|
||||
|
||||
// Initialize dashboard module
|
||||
dashboard := dashboardModuleCallback(sqlstore, providerSettings, analytics, orgGetter, queryParser, querier, licensing)
|
||||
|
||||
@@ -441,7 +447,7 @@ func New(
|
||||
}
|
||||
|
||||
// Initialize all modules
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger, tagModule)
|
||||
|
||||
// Initialize ruler from the variant-specific provider factories
|
||||
rulerInstance, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.Ruler, rulerProviderFactories(cache, alertmanager, sqlstore, telemetrystore, telemetryMetadataStore, prometheus, orgGetter, modules.RuleStateHistory, querier, queryParser), "signoz")
|
||||
|
||||
102
pkg/sqlmigration/079_add_tags.go
Normal file
102
pkg/sqlmigration/079_add_tags.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type addTags struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
sqlschema sqlschema.SQLSchema
|
||||
}
|
||||
|
||||
func NewAddTagsFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_tags"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return &addTags{
|
||||
sqlstore: sqlstore,
|
||||
sqlschema: sqlschema,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *addTags) Register(migrations *migrate.Migrations) error {
|
||||
return migrations.Register(migration.Up, migration.Down)
|
||||
}
|
||||
|
||||
func (migration *addTags) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
sqls := [][]byte{}
|
||||
|
||||
tagTableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "tag",
|
||||
Columns: []*sqlschema.Column{
|
||||
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "key", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "value", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "entity_type", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: true},
|
||||
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: true},
|
||||
},
|
||||
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{ColumnNames: []sqlschema.ColumnName{"id"}},
|
||||
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
|
||||
{
|
||||
ReferencingColumnName: sqlschema.ColumnName("org_id"),
|
||||
ReferencedTableName: sqlschema.TableName("organizations"),
|
||||
ReferencedColumnName: sqlschema.ColumnName("id"),
|
||||
},
|
||||
},
|
||||
})
|
||||
sqls = append(sqls, tagTableSQLs...)
|
||||
|
||||
// Functional unique index: case-insensitive uniqueness on (org_id, entity_type, key, value).
|
||||
// sqlschema.UniqueIndex doesn't support expressions, so emit raw SQL — both
|
||||
// Postgres and SQLite (modernc 3.50.x) support expression indexes.
|
||||
sqls = append(sqls, []byte(`CREATE UNIQUE INDEX IF NOT EXISTS uq_tag_org_entity_lower_key_lower_value ON tag (org_id, entity_type, LOWER(key), LOWER(value))`))
|
||||
|
||||
tagRelationsTableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "tag_relations",
|
||||
Columns: []*sqlschema.Column{
|
||||
{Name: "entity_type", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "entity_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "tag_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
},
|
||||
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{ColumnNames: []sqlschema.ColumnName{"entity_type", "entity_id", "tag_id"}},
|
||||
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
|
||||
{
|
||||
ReferencingColumnName: sqlschema.ColumnName("org_id"),
|
||||
ReferencedTableName: sqlschema.TableName("organizations"),
|
||||
ReferencedColumnName: sqlschema.ColumnName("id"),
|
||||
},
|
||||
},
|
||||
})
|
||||
sqls = append(sqls, tagRelationsTableSQLs...)
|
||||
|
||||
for _, sql := range sqls {
|
||||
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *addTags) Down(_ context.Context, _ *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
99
pkg/types/inframonitoringtypes/namespaces.go
Normal file
99
pkg/types/inframonitoringtypes/namespaces.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package inframonitoringtypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
type Namespaces struct {
|
||||
Type ResponseType `json:"type" required:"true"`
|
||||
Records []NamespaceRecord `json:"records" required:"true"`
|
||||
Total int `json:"total" required:"true"`
|
||||
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
|
||||
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
|
||||
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
|
||||
}
|
||||
|
||||
type NamespaceRecord struct {
|
||||
NamespaceName string `json:"namespaceName" required:"true"`
|
||||
NamespaceCPU float64 `json:"namespaceCPU" required:"true"`
|
||||
NamespaceMemory float64 `json:"namespaceMemory" required:"true"`
|
||||
PodCountsByPhase PodCountsByPhase `json:"podCountsByPhase" required:"true"`
|
||||
Meta map[string]string `json:"meta" required:"true"`
|
||||
}
|
||||
|
||||
// PostableNamespaces is the request body for the v2 namespaces list API.
|
||||
type PostableNamespaces struct {
|
||||
Start int64 `json:"start" required:"true"`
|
||||
End int64 `json:"end" required:"true"`
|
||||
Filter *qbtypes.Filter `json:"filter"`
|
||||
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
|
||||
OrderBy *qbtypes.OrderBy `json:"orderBy"`
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit" required:"true"`
|
||||
}
|
||||
|
||||
// Validate ensures PostableNamespaces contains acceptable values.
|
||||
func (req *PostableNamespaces) Validate() error {
|
||||
if req == nil {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
|
||||
}
|
||||
|
||||
if req.Start <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid start time %d: start must be greater than 0",
|
||||
req.Start,
|
||||
)
|
||||
}
|
||||
|
||||
if req.End <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid end time %d: end must be greater than 0",
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Start >= req.End {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid time range: start (%d) must be less than end (%d)",
|
||||
req.Start,
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Limit < 1 || req.Limit > 5000 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
|
||||
}
|
||||
|
||||
if req.Offset < 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
|
||||
}
|
||||
|
||||
if req.OrderBy != nil {
|
||||
if !slices.Contains(NamespacesValidOrderByKeys, req.OrderBy.Key.Name) {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
|
||||
}
|
||||
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON validates input immediately after decoding.
|
||||
func (req *PostableNamespaces) UnmarshalJSON(data []byte) error {
|
||||
type raw PostableNamespaces
|
||||
var decoded raw
|
||||
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||
return err
|
||||
}
|
||||
*req = PostableNamespaces(decoded)
|
||||
return req.Validate()
|
||||
}
|
||||
11
pkg/types/inframonitoringtypes/namespaces_constants.go
Normal file
11
pkg/types/inframonitoringtypes/namespaces_constants.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package inframonitoringtypes
|
||||
|
||||
const (
|
||||
NamespacesOrderByCPU = "cpu"
|
||||
NamespacesOrderByMemory = "memory"
|
||||
)
|
||||
|
||||
var NamespacesValidOrderByKeys = []string{
|
||||
NamespacesOrderByCPU,
|
||||
NamespacesOrderByMemory,
|
||||
}
|
||||
237
pkg/types/inframonitoringtypes/namespaces_test.go
Normal file
237
pkg/types/inframonitoringtypes/namespaces_test.go
Normal file
@@ -0,0 +1,237 @@
|
||||
package inframonitoringtypes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPostableNamespaces_Validate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
req *PostableNamespaces
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "valid request",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "nil request",
|
||||
req: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "start time zero",
|
||||
req: &PostableNamespaces{
|
||||
Start: 0,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "start time negative",
|
||||
req: &PostableNamespaces{
|
||||
Start: -1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "end time zero",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 0,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "start time greater than end time",
|
||||
req: &PostableNamespaces{
|
||||
Start: 2000,
|
||||
End: 1000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "start time equal to end time",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 1000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "limit zero",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 0,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "limit negative",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: -10,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "limit exceeds max",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 5001,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "offset negative",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: -5,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "orderBy nil is valid",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "orderBy with valid key cpu and direction asc",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
OrderBy: &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: NamespacesOrderByCPU,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "orderBy with valid key memory and direction desc",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
OrderBy: &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: NamespacesOrderByMemory,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "orderBy with pod_phase key is rejected",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
OrderBy: &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "pod_phase",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "orderBy with invalid key",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
OrderBy: &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "unknown",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "orderBy with valid key but invalid direction",
|
||||
req: &PostableNamespaces{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Limit: 100,
|
||||
Offset: 0,
|
||||
OrderBy: &qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: NamespacesOrderByMemory,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.req.Validate()
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
27
pkg/types/tagtypes/store.go
Normal file
27
pkg/types/tagtypes/store.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package tagtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
List(ctx context.Context, orgID valuer.UUID, entityType EntityType) ([]*Tag, error)
|
||||
|
||||
ListByEntity(ctx context.Context, entityType EntityType, entityID valuer.UUID) ([]*Tag, error)
|
||||
|
||||
ListByEntities(ctx context.Context, entityType EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*Tag, error)
|
||||
|
||||
// Create upserts the given tags and returns them with authoritative IDs.
|
||||
// On conflict on (org_id, entity_type, LOWER(key), LOWER(value)) — which
|
||||
// happens only when a concurrent insert raced ours, including casing-only
|
||||
// collisions — the returned entry carries the existing row's ID rather
|
||||
// than the pre-generated one in the input.
|
||||
Create(ctx context.Context, tags []*Tag) ([]*Tag, error)
|
||||
|
||||
// CreateRelations inserts tag-entity relations. Conflicts on the composite primary key are ignored.
|
||||
CreateRelations(ctx context.Context, relations []*TagRelation) error
|
||||
|
||||
DeleteRelationsExcept(ctx context.Context, entityType EntityType, entityID valuer.UUID, keepTagIDs []valuer.UUID) error
|
||||
}
|
||||
137
pkg/types/tagtypes/tag.go
Normal file
137
pkg/types/tagtypes/tag.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package tagtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeTagInvalidName = errors.MustNewCode("tag_invalid_name")
|
||||
ErrCodeTagNotFound = errors.MustNewCode("tag_not_found")
|
||||
)
|
||||
|
||||
type Tag struct {
|
||||
bun.BaseModel `bun:"table:tag,alias:tag"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
types.UserAuditable
|
||||
Key string `json:"key" required:"true" bun:"key,type:text,notnull"`
|
||||
Value string `json:"value" required:"true" bun:"value,type:text,notnull"`
|
||||
OrgID valuer.UUID `json:"orgId" required:"true" bun:"org_id,type:text,notnull"`
|
||||
EntityType EntityType `json:"entityType" required:"true" bun:"entity_type,type:text,notnull"`
|
||||
}
|
||||
|
||||
type PostableTag struct {
|
||||
Key string `json:"key" required:"true"`
|
||||
Value string `json:"value" required:"true"`
|
||||
}
|
||||
|
||||
type GettableTag = PostableTag
|
||||
|
||||
func NewGettableTagFromTag(tag *Tag) *GettableTag {
|
||||
return &GettableTag{Key: tag.Key, Value: tag.Value}
|
||||
}
|
||||
|
||||
func NewGettableTagsFromTags(tags []*Tag) []*GettableTag {
|
||||
out := make([]*GettableTag, len(tags))
|
||||
for i, t := range tags {
|
||||
out[i] = NewGettableTagFromTag(t)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func NewTag(orgID valuer.UUID, entityType EntityType, key, value, createdBy string) *Tag {
|
||||
now := time.Now()
|
||||
return &Tag{
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy,
|
||||
},
|
||||
Key: key,
|
||||
Value: value,
|
||||
OrgID: orgID,
|
||||
EntityType: entityType,
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve canonicalizes a batch of user-supplied (key, value) tag pairs against
|
||||
// the existing tags for an org. Lookup is case-insensitive on both key and
|
||||
// value (matching the storage uniqueness rule); when an existing row matches,
|
||||
// its display casing is reused. Inputs are deduped on (LOWER(key), LOWER(value));
|
||||
// the first input's casing wins on collisions. Returns:
|
||||
// - toCreate: new Tag rows the caller should insert (with pre-generated IDs)
|
||||
// - matched: existing rows the caller's input already pointed to. They
|
||||
// already carry authoritative IDs from the store.
|
||||
func Resolve(ctx context.Context, store Store, orgID valuer.UUID, entityType EntityType, postable []PostableTag, createdBy string) ([]*Tag, []*Tag, error) {
|
||||
if len(postable) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
existing, err := store.List(ctx, orgID, entityType)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
lowercaseTagsMap := make(map[string]*Tag, len(existing))
|
||||
for _, t := range existing {
|
||||
mapKey := strings.ToLower(t.Key) + "\x00" + strings.ToLower(t.Value)
|
||||
lowercaseTagsMap[mapKey] = t
|
||||
}
|
||||
|
||||
seenInRequestAlready := make(map[string]struct{}, len(postable)) // postable can have the same tag multiple times
|
||||
toCreate := make([]*Tag, 0)
|
||||
matched := make([]*Tag, 0)
|
||||
|
||||
for _, p := range postable {
|
||||
key, value, err := validatePostableTag(p)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
lookup := strings.ToLower(key) + "\x00" + strings.ToLower(value)
|
||||
if _, dup := seenInRequestAlready[lookup]; dup {
|
||||
continue
|
||||
}
|
||||
seenInRequestAlready[lookup] = struct{}{}
|
||||
|
||||
if existingTag, ok := lowercaseTagsMap[lookup]; ok {
|
||||
matched = append(matched, existingTag)
|
||||
continue
|
||||
}
|
||||
toCreate = append(toCreate, NewTag(orgID, entityType, key, value, createdBy))
|
||||
}
|
||||
|
||||
return toCreate, matched, nil
|
||||
}
|
||||
|
||||
// Entity-specific reserved-key checks (e.g. dashboard column names that would
|
||||
// collide with the list-query DSL) are the caller's responsibility — perform
|
||||
// them before calling into the tag module.
|
||||
func validatePostableTag(p PostableTag) (string, string, error) {
|
||||
key := strings.TrimSpace(p.Key)
|
||||
value := strings.TrimSpace(p.Value)
|
||||
if key == "" {
|
||||
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag key cannot be empty")
|
||||
}
|
||||
if value == "" {
|
||||
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag value cannot be empty")
|
||||
}
|
||||
if strings.ContainsRune(key, '/') {
|
||||
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag key %q cannot contain '/'", key)
|
||||
}
|
||||
if strings.ContainsRune(value, '/') {
|
||||
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag value %q cannot contain '/'", value)
|
||||
}
|
||||
return key, value, nil
|
||||
}
|
||||
38
pkg/types/tagtypes/tag_relation.go
Normal file
38
pkg/types/tagtypes/tag_relation.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package tagtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type EntityType struct{ valuer.String }
|
||||
|
||||
func MustNewEntityType(name string) EntityType {
|
||||
return EntityType{valuer.NewString(name)}
|
||||
}
|
||||
|
||||
type TagRelation struct {
|
||||
bun.BaseModel `bun:"table:tag_relations,alias:tag_relations"`
|
||||
|
||||
EntityType EntityType `json:"entityType" required:"true" bun:"entity_type,type:text,notnull"`
|
||||
EntityID valuer.UUID `json:"entityId" required:"true" bun:"entity_id,pk,type:text,notnull"`
|
||||
TagID valuer.UUID `json:"tagId" required:"true" bun:"tag_id,pk,type:text,notnull"`
|
||||
OrgID valuer.UUID `json:"orgId" required:"true" bun:"org_id,type:text,notnull"`
|
||||
}
|
||||
|
||||
func NewTagRelation(orgID valuer.UUID, entityType EntityType, entityID valuer.UUID, tagID valuer.UUID) *TagRelation {
|
||||
return &TagRelation{
|
||||
EntityType: entityType,
|
||||
EntityID: entityID,
|
||||
TagID: tagID,
|
||||
OrgID: orgID,
|
||||
}
|
||||
}
|
||||
|
||||
func NewTagRelations(orgID valuer.UUID, entityType EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) []*TagRelation {
|
||||
relations := make([]*TagRelation, 0, len(tagIDs))
|
||||
for _, tagID := range tagIDs {
|
||||
relations = append(relations, NewTagRelation(orgID, entityType, entityID, tagID))
|
||||
}
|
||||
return relations
|
||||
}
|
||||
166
pkg/types/tagtypes/tag_test.go
Normal file
166
pkg/types/tagtypes/tag_test.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package tagtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValidatePostableTag(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input PostableTag
|
||||
wantKey string
|
||||
wantValue string
|
||||
wantError bool
|
||||
}{
|
||||
{name: "simple pair", input: PostableTag{Key: "team", Value: "pulse"}, wantKey: "team", wantValue: "pulse"},
|
||||
{name: "preserves casing", input: PostableTag{Key: "Team", Value: "Pulse"}, wantKey: "Team", wantValue: "Pulse"},
|
||||
{name: "trims key", input: PostableTag{Key: " team ", Value: "pulse"}, wantKey: "team", wantValue: "pulse"},
|
||||
{name: "trims value", input: PostableTag{Key: "team", Value: " pulse "}, wantKey: "team", wantValue: "pulse"},
|
||||
|
||||
{name: "empty key rejected", input: PostableTag{Key: "", Value: "pulse"}, wantError: true},
|
||||
{name: "empty value rejected", input: PostableTag{Key: "team", Value: ""}, wantError: true},
|
||||
{name: "whitespace-only key rejected", input: PostableTag{Key: " ", Value: "pulse"}, wantError: true},
|
||||
{name: "whitespace-only value rejected", input: PostableTag{Key: "team", Value: " "}, wantError: true},
|
||||
|
||||
{name: "slash in key rejected", input: PostableTag{Key: "team/eng", Value: "pulse"}, wantError: true},
|
||||
{name: "slash in value rejected", input: PostableTag{Key: "team", Value: "pulse/events"}, wantError: true},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
gotKey, gotValue, err := validatePostableTag(tc.input)
|
||||
if tc.wantError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.wantKey, gotKey)
|
||||
assert.Equal(t, tc.wantValue, gotValue)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var testEntityType = MustNewEntityType("dashboard")
|
||||
|
||||
type fakeStore struct {
|
||||
tags []*Tag
|
||||
listCallCount int
|
||||
}
|
||||
|
||||
func (f *fakeStore) List(_ context.Context, _ valuer.UUID, _ EntityType) ([]*Tag, error) {
|
||||
f.listCallCount++
|
||||
out := make([]*Tag, len(f.tags))
|
||||
copy(out, f.tags)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) Create(_ context.Context, tags []*Tag) ([]*Tag, error) {
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) CreateRelations(_ context.Context, _ []*TagRelation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) ListByEntity(_ context.Context, _ EntityType, _ valuer.UUID) ([]*Tag, error) {
|
||||
return []*Tag{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) ListByEntities(_ context.Context, _ EntityType, _ []valuer.UUID) (map[valuer.UUID][]*Tag, error) {
|
||||
return map[valuer.UUID][]*Tag{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) DeleteRelationsExcept(_ context.Context, _ EntityType, _ valuer.UUID, _ []valuer.UUID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestResolve(t *testing.T) {
|
||||
t.Run("empty input does not hit store", func(t *testing.T) {
|
||||
store := &fakeStore{}
|
||||
toCreate, matched, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, nil, "u@signoz.io")
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, toCreate)
|
||||
assert.Empty(t, matched)
|
||||
assert.Zero(t, store.listCallCount, "should not hit store when input is empty")
|
||||
})
|
||||
|
||||
t.Run("creates missing pairs and reuses existing", func(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
dbTag := NewTag(orgID, testEntityType, "team", "Pulse", "seed")
|
||||
dbTag2 := NewTag(orgID, testEntityType, "Database", "redis", "seed")
|
||||
store := &fakeStore{tags: []*Tag{dbTag, dbTag2}}
|
||||
|
||||
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
|
||||
{Key: "team", Value: "events"}, // new
|
||||
{Key: "DATABASE", Value: "REDIS"}, // case-only conflict
|
||||
{Key: "Brand", Value: "New"}, // new
|
||||
}, "u@signoz.io")
|
||||
require.NoError(t, err)
|
||||
|
||||
createdLowerKVs := []string{}
|
||||
for _, tg := range toCreate {
|
||||
createdLowerKVs = append(createdLowerKVs, strings.ToLower(tg.Key)+"\x00"+strings.ToLower(tg.Value))
|
||||
}
|
||||
assert.ElementsMatch(t, []string{"team\x00events", "brand\x00new"}, createdLowerKVs,
|
||||
"only the two missing pairs should be returned for insertion")
|
||||
|
||||
require.Len(t, matched, 1, "DATABASE:REDIS should hit the existing 'Database:redis' tag")
|
||||
assert.Same(t, dbTag2, matched[0], "matched should return the existing pointer with its authoritative ID")
|
||||
})
|
||||
|
||||
t.Run("dedupes inputs that map to the same lower(key)+lower(value)", func(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
store := &fakeStore{}
|
||||
|
||||
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
|
||||
{Key: "Foo", Value: "Bar"},
|
||||
{Key: "foo", Value: "bar"},
|
||||
{Key: "FOO", Value: "BAR"},
|
||||
}, "u@signoz.io")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Empty(t, matched)
|
||||
require.Len(t, toCreate, 1, "duplicate inputs must collapse into a single insert")
|
||||
assert.Equal(t, "Foo", toCreate[0].Key, "first input's casing wins")
|
||||
assert.Equal(t, "Bar", toCreate[0].Value, "first input's casing wins")
|
||||
})
|
||||
|
||||
t.Run("preserves existing casing on case-only match", func(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
dbTag := NewTag(orgID, testEntityType, "Team", "Pulse", "seed")
|
||||
store := &fakeStore{tags: []*Tag{dbTag}}
|
||||
|
||||
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
|
||||
{Key: "team", Value: "PULSE"},
|
||||
}, "u@signoz.io")
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Empty(t, toCreate)
|
||||
require.Len(t, matched, 1)
|
||||
assert.Equal(t, "Team", matched[0].Key)
|
||||
assert.Equal(t, "Pulse", matched[0].Value)
|
||||
})
|
||||
|
||||
t.Run("propagates validation error from any input", func(t *testing.T) {
|
||||
store := &fakeStore{}
|
||||
_, _, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, []PostableTag{
|
||||
{Key: "team", Value: "pulse"},
|
||||
{Key: "", Value: "x"},
|
||||
}, "u@signoz.io")
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("propagates slash validation error", func(t *testing.T) {
|
||||
store := &fakeStore{}
|
||||
_, _, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, []PostableTag{
|
||||
{Key: "team/eng", Value: "pulse"},
|
||||
}, "u@signoz.io")
|
||||
require.Error(t, err)
|
||||
assert.True(t, strings.Contains(err.Error(), "/"), "error should reference the disallowed character")
|
||||
})
|
||||
}
|
||||
@@ -20,7 +20,7 @@ from fixtures.querier import (
|
||||
|
||||
|
||||
def _get_bodies(response: requests.Response) -> list[dict[str, Any]]:
|
||||
return [row["data"]["body"] for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]) for row in get_rows(response)]
|
||||
|
||||
|
||||
def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[str, Any]) -> None:
|
||||
@@ -1183,7 +1183,7 @@ def test_message_searches(
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
def _body_messages(response: requests.Response) -> list[str]:
|
||||
return [row["data"]["body"].get("message", "") for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]).get("message", "") for row in get_rows(response)]
|
||||
|
||||
payment_messages = {
|
||||
"Payment processed successfully",
|
||||
|
||||
Reference in New Issue
Block a user