Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
ef748e9ce2 fix: add userauditable to ttl setting 2026-05-11 10:28:38 +05:30
36 changed files with 119 additions and 2021 deletions

View File

@@ -8,7 +8,6 @@ import (
"github.com/spf13/cobra"
"github.com/SigNoz/signoz/cmd"
"github.com/SigNoz/signoz/ee/auditor/fileauditor"
"github.com/SigNoz/signoz/ee/auditor/otlphttpauditor"
"github.com/SigNoz/signoz/ee/authn/callbackauthn/oidccallbackauthn"
"github.com/SigNoz/signoz/ee/authn/callbackauthn/samlcallbackauthn"
@@ -156,9 +155,6 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
if err := factories.Add(otlphttpauditor.NewFactory(licensing, version.Info)); err != nil {
panic(err)
}
if err := factories.Add(fileauditor.NewFactory(licensing, version.Info)); err != nil {
panic(err)
}
return factories
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {

View File

@@ -2599,54 +2599,6 @@ components:
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesNamespaceRecord:
properties:
meta:
additionalProperties:
type: string
nullable: true
type: object
namespaceCPU:
format: double
type: number
namespaceMemory:
format: double
type: number
namespaceName:
type: string
podCountsByPhase:
$ref: '#/components/schemas/InframonitoringtypesPodCountsByPhase'
required:
- namespaceName
- namespaceCPU
- namespaceMemory
- podCountsByPhase
- meta
type: object
InframonitoringtypesNamespaces:
properties:
endTimeBeforeRetention:
type: boolean
records:
items:
$ref: '#/components/schemas/InframonitoringtypesNamespaceRecord'
nullable: true
type: array
requiredMetricsCheck:
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
total:
type: integer
type:
$ref: '#/components/schemas/InframonitoringtypesResponseType'
warning:
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
required:
- type
- records
- total
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesNodeCondition:
enum:
- ready
@@ -2850,32 +2802,6 @@ components:
- end
- limit
type: object
InframonitoringtypesPostableNamespaces:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
groupBy:
items:
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
required:
- start
- end
- limit
type: object
InframonitoringtypesPostableNodes:
properties:
end:
@@ -11814,74 +11740,6 @@ paths:
summary: List Hosts for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/namespaces:
post:
deprecated: false
description: 'Returns a paginated list of Kubernetes namespaces with key aggregated
pod metrics: CPU usage and memory working set (summed across pods in the group),
plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown
} from each pod''s latest k8s.pod.phase value in the window). Each namespace
includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response
type is ''list'' for the default k8s.namespace.name grouping or ''grouped_list''
for custom groupBy keys; in both modes every row aggregates pods in the group.
Supports filtering via a filter expression, custom groupBy, ordering by cpu
/ memory, and pagination via offset/limit. Also reports missing required metrics
and whether the requested time range falls before the data retention boundary.
Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel
when no data is available for that field.'
operationId: ListNamespaces
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/InframonitoringtypesPostableNamespaces'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/InframonitoringtypesNamespaces'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List Namespaces for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/nodes:
post:
deprecated: false

View File

@@ -1,33 +0,0 @@
package fileauditor
import (
"context"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/audittypes"
)
func (provider *provider) export(ctx context.Context, events []audittypes.AuditEvent) error {
logs := audittypes.NewPLogsFromAuditEvents(events, "signoz", provider.build.Version(), "signoz.audit")
payload, err := provider.marshaler.MarshalLogs(logs)
if err != nil {
return err
}
// Combine the payload and trailing newline into one Write call so the line
// is emitted in a single syscall — concurrent readers see either the full
// line or nothing, never a torn JSON object.
payload = append(payload, '\n')
provider.mu.Lock()
defer provider.mu.Unlock()
if _, err := provider.file.Write(payload); err != nil {
provider.settings.Logger().ErrorContext(ctx, "audit export failed", errors.Attr(err), slog.Int("dropped_log_records", len(events)))
return err
}
return provider.file.Sync()
}

View File

@@ -1,100 +0,0 @@
package fileauditor
import (
"context"
"os"
"sync"
"github.com/SigNoz/signoz/pkg/auditor"
"github.com/SigNoz/signoz/pkg/auditor/auditorserver"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/types/audittypes"
"github.com/SigNoz/signoz/pkg/version"
"go.opentelemetry.io/collector/pdata/plog"
)
var _ auditor.Auditor = (*provider)(nil)
type provider struct {
settings factory.ScopedProviderSettings
config auditor.Config
licensing licensing.Licensing
build version.Build
server *auditorserver.Server
marshaler plog.JSONMarshaler
file *os.File
mu sync.Mutex
}
func NewFactory(licensing licensing.Licensing, build version.Build) factory.ProviderFactory[auditor.Auditor, auditor.Config] {
return factory.NewProviderFactory(factory.MustNewName("file"), func(ctx context.Context, providerSettings factory.ProviderSettings, config auditor.Config) (auditor.Auditor, error) {
return newProvider(ctx, providerSettings, config, licensing, build)
})
}
func newProvider(_ context.Context, providerSettings factory.ProviderSettings, config auditor.Config, licensing licensing.Licensing, build version.Build) (auditor.Auditor, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/auditor/fileauditor")
file, err := os.OpenFile(config.File.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, auditor.ErrCodeAuditExportFailed, "failed to open audit file %q", config.File.Path)
}
provider := &provider{
settings: settings,
config: config,
licensing: licensing,
build: build,
marshaler: plog.JSONMarshaler{},
file: file,
}
server, err := auditorserver.New(settings,
auditorserver.Config{
BufferSize: config.BufferSize,
BatchSize: config.BatchSize,
FlushInterval: config.FlushInterval,
},
provider.export,
)
if err != nil {
_ = file.Close()
return nil, err
}
provider.server = server
return provider, nil
}
func (provider *provider) Start(ctx context.Context) error {
return provider.server.Start(ctx)
}
func (provider *provider) Audit(ctx context.Context, event audittypes.AuditEvent) {
if event.PrincipalAttributes.PrincipalOrgID.IsZero() {
provider.settings.Logger().WarnContext(ctx, "audit event dropped as org_id is zero")
return
}
if _, err := provider.licensing.GetActive(ctx, event.PrincipalAttributes.PrincipalOrgID); err != nil {
return
}
provider.server.Add(ctx, event)
}
func (provider *provider) Healthy() <-chan struct{} {
return provider.server.Healthy()
}
func (provider *provider) Stop(ctx context.Context) error {
serverErr := provider.server.Stop(ctx)
fileErr := provider.file.Close()
if serverErr != nil {
return serverErr
}
return fileErr
}

View File

@@ -13,11 +13,9 @@ import type {
import type {
InframonitoringtypesPostableHostsDTO,
InframonitoringtypesPostableNamespacesDTO,
InframonitoringtypesPostableNodesDTO,
InframonitoringtypesPostablePodsDTO,
ListHosts200,
ListNamespaces200,
ListNodes200,
ListPods200,
RenderErrorResponseDTO,
@@ -110,90 +108,6 @@ export const useListHosts = <
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.
* @summary List Namespaces for Infra Monitoring
*/
export const listNamespaces = (
inframonitoringtypesPostableNamespacesDTO: BodyType<InframonitoringtypesPostableNamespacesDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListNamespaces200>({
url: `/api/v2/infra_monitoring/namespaces`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: inframonitoringtypesPostableNamespacesDTO,
signal,
});
};
export const getListNamespacesMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
> => {
const mutationKey = ['listNamespaces'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof listNamespaces>>,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> }
> = (props) => {
const { data } = props ?? {};
return listNamespaces(data);
};
return { mutationFn, ...mutationOptions };
};
export type ListNamespacesMutationResult = NonNullable<
Awaited<ReturnType<typeof listNamespaces>>
>;
export type ListNamespacesMutationBody =
BodyType<InframonitoringtypesPostableNamespacesDTO>;
export type ListNamespacesMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List Namespaces for Infra Monitoring
*/
export const useListNamespaces = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
> => {
const mutationOptions = getListNamespacesMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes nodes with key metrics: CPU usage, CPU allocatable, memory working set, memory allocatable, per-group nodeCountsByReadiness ({ ready, notReady } from each node's latest k8s.node.condition_ready in the window) and per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } for pods scheduled on the listed nodes). Each node includes metadata attributes (k8s.node.uid, k8s.cluster.name). The response type is 'list' for the default k8s.node.name grouping (each row is one node with its current condition string: ready / not_ready / no_data) or 'grouped_list' for custom groupBy keys (each row aggregates nodes in the group; condition stays no_data). Supports filtering via a filter expression, custom groupBy, ordering by cpu / cpu_allocatable / memory / memory_allocatable, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (nodeCPU, nodeCPUAllocatable, nodeMemory, nodeMemoryAllocatable) return -1 as a sentinel when no data is available for that field.
* @summary List Nodes for Infra Monitoring

View File

@@ -4653,55 +4653,6 @@ export interface InframonitoringtypesHostsDTO {
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
/**
* @nullable
*/
export type InframonitoringtypesNamespaceRecordDTOMeta = {
[key: string]: string;
} | null;
export interface InframonitoringtypesNamespaceRecordDTO {
/**
* @type object
* @nullable true
*/
meta: InframonitoringtypesNamespaceRecordDTOMeta;
/**
* @type number
* @format double
*/
namespaceCPU: number;
/**
* @type number
* @format double
*/
namespaceMemory: number;
/**
* @type string
*/
namespaceName: string;
podCountsByPhase: InframonitoringtypesPodCountsByPhaseDTO;
}
export interface InframonitoringtypesNamespacesDTO {
/**
* @type boolean
*/
endTimeBeforeRetention: boolean;
/**
* @type array
* @nullable true
*/
records: InframonitoringtypesNamespaceRecordDTO[] | null;
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
/**
* @type integer
*/
total: number;
type: InframonitoringtypesResponseTypeDTO;
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export enum InframonitoringtypesNodeConditionDTO {
ready = 'ready',
not_ready = 'not_ready',
@@ -4913,34 +4864,6 @@ export interface InframonitoringtypesPostableHostsDTO {
start: number;
}
export interface InframonitoringtypesPostableNamespacesDTO {
/**
* @type integer
* @format int64
*/
end: number;
filter?: Querybuildertypesv5FilterDTO;
/**
* @type array
* @nullable true
*/
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
/**
* @type integer
*/
limit: number;
/**
* @type integer
*/
offset?: number;
orderBy?: Querybuildertypesv5OrderByDTO;
/**
* @type integer
* @format int64
*/
start: number;
}
export interface InframonitoringtypesPostableNodesDTO {
/**
* @type integer
@@ -9332,14 +9255,6 @@ export type ListHosts200 = {
status: string;
};
export type ListNamespaces200 = {
data: InframonitoringtypesNamespacesDTO;
/**
* @type string
*/
status: string;
};
export type ListNodes200 = {
data: InframonitoringtypesNodesDTO;
/**

View File

@@ -398,7 +398,7 @@ describe('useTableParams (selective URL mode — partial config object)', () =>
.filter(Boolean)
.pop();
expect(lastExpanded).toBeDefined();
expect(JSON.parse(lastExpanded!)).toStrictEqual(
expect(JSON.parse(lastExpanded!)).toEqual(
expect.arrayContaining(['row-1', 'row-2']),
);

View File

@@ -67,24 +67,5 @@ func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v2/infra_monitoring/namespaces", handler.New(
provider.authzMiddleware.ViewAccess(provider.infraMonitoringHandler.ListNamespaces),
handler.OpenAPIDef{
ID: "ListNamespaces",
Tags: []string{"inframonitoring"},
Summary: "List Namespaces for Infra Monitoring",
Description: "Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.",
Request: new(inframonitoringtypes.PostableNamespaces),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Namespaces),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -25,8 +25,6 @@ type Config struct {
FlushInterval time.Duration `mapstructure:"flush_interval"`
OTLPHTTP OTLPHTTPConfig `mapstructure:"otlphttp"`
File FileConfig `mapstructure:"file"`
}
// OTLPHTTPConfig holds configuration for the OTLP HTTP exporter provider.
@@ -48,12 +46,6 @@ type OTLPHTTPConfig struct {
Retry RetryConfig `mapstructure:"retry"`
}
type FileConfig struct {
// Path is the absolute path to the audit log file. The file is opened with
// O_APPEND|O_CREATE|O_WRONLY; existing contents are preserved across runs.
Path string `mapstructure:"path"`
}
// RetryConfig configures exponential backoff for the OTLP HTTP exporter.
type RetryConfig struct {
// Enabled controls whether retries are attempted on transient failures.
@@ -119,11 +111,5 @@ func (c Config) Validate() error {
}
}
if c.Provider == "file" {
if c.File.Path == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "auditor::file::path must be set when provider is file")
}
}
return nil
}

View File

@@ -93,27 +93,3 @@ func (h *handler) ListNodes(rw http.ResponseWriter, req *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) ListNamespaces(rw http.ResponseWriter, req *http.Request) {
claims, err := authtypes.ClaimsFromContext(req.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var parsedReq inframonitoringtypes.PostableNamespaces
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.ListNamespaces(req.Context(), orgID, &parsedReq)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -337,92 +337,3 @@ func (m *module) ListNodes(ctx context.Context, orgID valuer.UUID, req *inframon
return resp, nil
}
func (m *module) ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error) {
if err := req.Validate(); err != nil {
return nil, err
}
resp := &inframonitoringtypes.Namespaces{}
if req.OrderBy == nil {
req.OrderBy = &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: inframonitoringtypes.NamespacesOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionDesc,
}
}
if len(req.GroupBy) == 0 {
req.GroupBy = []qbtypes.GroupByKey{namespaceNameGroupByKey}
resp.Type = inframonitoringtypes.ResponseTypeList
} else {
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
}
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, namespacesTableMetricNamesList)
if err != nil {
return nil, err
}
if len(missingMetrics) > 0 {
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
resp.Records = []inframonitoringtypes.NamespaceRecord{}
resp.Total = 0
return resp, nil
}
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []inframonitoringtypes.NamespaceRecord{}
resp.Total = 0
return resp, nil
}
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
metadataMap, err := m.getNamespacesTableMetadata(ctx, req)
if err != nil {
return nil, err
}
resp.Total = len(metadataMap)
pageGroups, err := m.getTopNamespaceGroups(ctx, orgID, req, metadataMap)
if err != nil {
return nil, err
}
if len(pageGroups) == 0 {
resp.Records = []inframonitoringtypes.NamespaceRecord{}
return resp, nil
}
filterExpr := ""
if req.Filter != nil {
filterExpr = req.Filter.Expression
}
fullQueryReq := buildFullQueryRequest(req.Start, req.End, filterExpr, req.GroupBy, pageGroups, m.newNamespacesTableListQuery())
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
if err != nil {
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
if err != nil {
return nil, err
}
resp.Records = buildNamespaceRecords(queryResp, pageGroups, req.GroupBy, metadataMap, phaseCounts)
resp.Warning = queryResp.Warning
return resp, nil
}

View File

@@ -1,123 +0,0 @@
package implinframonitoring
import (
"context"
"slices"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// buildNamespaceRecords assembles the page records. Pod phase counts come from
// phaseCounts in both modes; every row is a group of pods, so there's no
// per-row "current phase" concept (unlike pods/nodes list mode).
func buildNamespaceRecords(
resp *qbtypes.QueryRangeResponse,
pageGroups []map[string]string,
groupBy []qbtypes.GroupByKey,
metadataMap map[string]map[string]string,
phaseCounts map[string]podPhaseCounts,
) []inframonitoringtypes.NamespaceRecord {
metricsMap := parseFullQueryResponse(resp, groupBy)
records := make([]inframonitoringtypes.NamespaceRecord, 0, len(pageGroups))
for _, labels := range pageGroups {
compositeKey := compositeKeyFromLabels(labels, groupBy)
namespaceName := labels[namespaceNameAttrKey]
record := inframonitoringtypes.NamespaceRecord{ // initialize with default values
NamespaceName: namespaceName,
NamespaceCPU: -1,
NamespaceMemory: -1,
Meta: map[string]string{},
}
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["A"]; exists {
record.NamespaceCPU = v
}
if v, exists := metrics["D"]; exists {
record.NamespaceMemory = v
}
}
if phaseCountsForGroup, ok := phaseCounts[compositeKey]; ok {
record.PodCountsByPhase = inframonitoringtypes.PodCountsByPhase{
Pending: phaseCountsForGroup.Pending,
Running: phaseCountsForGroup.Running,
Succeeded: phaseCountsForGroup.Succeeded,
Failed: phaseCountsForGroup.Failed,
Unknown: phaseCountsForGroup.Unknown,
}
}
if attrs, ok := metadataMap[compositeKey]; ok {
for k, v := range attrs {
record.Meta[k] = v
}
}
records = append(records, record)
}
return records
}
func (m *module) getTopNamespaceGroups(
ctx context.Context,
orgID valuer.UUID,
req *inframonitoringtypes.PostableNamespaces,
metadataMap map[string]map[string]string,
) ([]map[string]string, error) {
orderByKey := req.OrderBy.Key.Name
queryNamesForOrderBy := orderByToNamespacesQueryNames[orderByKey]
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
topReq := &qbtypes.QueryRangeRequest{
Start: uint64(req.Start),
End: uint64(req.End),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
},
}
for _, envelope := range m.newNamespacesTableListQuery().CompositeQuery.Queries {
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
continue
}
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(req.GroupBy)
}
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
}
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
if err != nil {
return nil, err
}
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
}
func (m *module) getNamespacesTableMetadata(ctx context.Context, req *inframonitoringtypes.PostableNamespaces) (map[string]map[string]string, error) {
var nonGroupByAttrs []string
for _, key := range namespaceAttrKeysForMetadata {
if !isKeyInGroupByAttrs(req.GroupBy, key) {
nonGroupByAttrs = append(nonGroupByAttrs, key)
}
}
return m.getMetadata(ctx, namespacesTableMetricNamesList, req.GroupBy, nonGroupByAttrs, req.Filter, req.Start, req.End)
}

View File

@@ -1,92 +0,0 @@
package implinframonitoring
import (
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
namespaceNameAttrKey = "k8s.namespace.name"
)
var namespaceNameGroupByKey = qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: namespaceNameAttrKey,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
// namespacesTableMetricNamesList drives the existence/retention check.
// Includes k8s.pod.phase so the response short-circuits cleanly when a
// cluster doesn't ship the metric — even though phase isn't part of the
// QB composite query (it's queried separately via getPerGroupPodPhaseCounts).
var namespacesTableMetricNamesList = []string{
"k8s.pod.cpu.usage",
"k8s.pod.memory.working_set",
"k8s.pod.phase",
}
var namespaceAttrKeysForMetadata = []string{
"k8s.namespace.name",
"k8s.cluster.name",
}
var orderByToNamespacesQueryNames = map[string][]string{
inframonitoringtypes.NamespacesOrderByCPU: {"A"},
inframonitoringtypes.NamespacesOrderByMemory: {"D"},
}
// newNamespacesTableListQuery builds the composite QB v5 request for the namespaces list.
// Pod phase counts are derived separately via getPerGroupPodPhaseCounts (works for both
// list and grouped_list modes), so no phase query is included here.
// Query letters A and D are kept aligned with the v1 implementation.
func (m *module) newNamespacesTableListQuery() *qbtypes.QueryRangeRequest {
queries := []qbtypes.QueryEnvelope{
// Query A: CPU usage — sum of pod CPU within the group.
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
Disabled: false,
},
},
// Query D: Memory working set — sum of pod memory within the group.
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "D",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory.working_set",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
Disabled: false,
},
},
}
return &qbtypes.QueryRangeRequest{
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
}

View File

@@ -12,12 +12,10 @@ type Handler interface {
ListHosts(http.ResponseWriter, *http.Request)
ListPods(http.ResponseWriter, *http.Request)
ListNodes(http.ResponseWriter, *http.Request)
ListNamespaces(http.ResponseWriter, *http.Request)
}
type Module interface {
ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error)
ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error)
ListNodes(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNodes) (*inframonitoringtypes.Nodes, error)
ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error)
}

View File

@@ -1,57 +0,0 @@
package impltag
import (
"context"
"github.com/SigNoz/signoz/pkg/modules/tag"
"github.com/SigNoz/signoz/pkg/types/tagtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store tagtypes.Store
}
func NewModule(store tagtypes.Store) tag.Module {
return &module{store: store}
}
func (m *module) CreateMany(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, postable []tagtypes.PostableTag, createdBy string) ([]*tagtypes.Tag, error) {
if len(postable) == 0 {
return []*tagtypes.Tag{}, nil
}
toCreate, matched, err := tagtypes.Resolve(ctx, m.store, orgID, entityType, postable, createdBy)
if err != nil {
return nil, err
}
created, err := m.store.Create(ctx, toCreate)
if err != nil {
return nil, err
}
return append(matched, created...), nil
}
func (m *module) LinkToEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error {
if len(tagIDs) == 0 {
return nil
}
return m.store.CreateRelations(ctx, tagtypes.NewTagRelations(orgID, entityType, entityID, tagIDs))
}
func (m *module) SyncLinksForEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error {
if err := m.store.CreateRelations(ctx, tagtypes.NewTagRelations(orgID, entityType, entityID, tagIDs)); err != nil {
return err
}
return m.store.DeleteRelationsExcept(ctx, entityType, entityID, tagIDs)
}
func (m *module) ListForEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error) {
return m.store.ListByEntity(ctx, entityType, entityID)
}
func (m *module) ListForEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error) {
return m.store.ListByEntities(ctx, entityType, entityIDs)
}

View File

@@ -1,132 +0,0 @@
package impltag
import (
"context"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/tagtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) tagtypes.Store {
return &store{sqlstore: sqlstore}
}
func (s *store) List(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType) ([]*tagtypes.Tag, error) {
tags := make([]*tagtypes.Tag, 0)
err := s.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(&tags).
Where("org_id = ?", orgID).
Where("entity_type = ?", entityType).
Scan(ctx)
if err != nil {
return nil, err
}
return tags, nil
}
func (s *store) ListByEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error) {
tags := make([]*tagtypes.Tag, 0)
err := s.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(&tags).
Join("JOIN tag_relations AS tr ON tr.tag_id = tag.id").
Where("tr.entity_type = ?", entityType).
Where("tr.entity_id = ?", entityID).
Scan(ctx)
if err != nil {
return nil, err
}
return tags, nil
}
func (s *store) ListByEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error) {
if len(entityIDs) == 0 {
return map[valuer.UUID][]*tagtypes.Tag{}, nil
}
type joinedRow struct {
tagtypes.Tag
EntityID valuer.UUID `bun:"entity_id"`
}
rows := make([]*joinedRow, 0)
err := s.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(&rows).
ColumnExpr("tag.*, tr.entity_id").
Join("JOIN tag_relations AS tr ON tr.tag_id = tag.id").
Where("tr.entity_type = ?", entityType).
Where("tr.entity_id IN (?)", bun.In(entityIDs)).
Scan(ctx)
if err != nil {
return nil, err
}
out := make(map[valuer.UUID][]*tagtypes.Tag)
for _, r := range rows {
tag := r.Tag
out[r.EntityID] = append(out[r.EntityID], &tag)
}
return out, nil
}
func (s *store) Create(ctx context.Context, tags []*tagtypes.Tag) ([]*tagtypes.Tag, error) {
if len(tags) == 0 {
return tags, nil
}
// DO UPDATE on a self-set is a deliberate no-op write whose only purpose
// is to make RETURNING fire on conflicting rows. Without it, RETURNING is
// silent on the conflict path and we'd have to refetch by (key, value) to
// learn the existing rows' IDs after a concurrent-insert race. Setting
// key = tag.key (the existing row's value) preserves the first writer's
// casing on case-only collisions.
err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(&tags).
On("CONFLICT (org_id, entity_type, (LOWER(key)), (LOWER(value))) DO UPDATE").
Set("key = tag.key").
Returning("*").
Scan(ctx)
if err != nil {
return nil, err
}
return tags, nil
}
func (s *store) CreateRelations(ctx context.Context, relations []*tagtypes.TagRelation) error {
if len(relations) == 0 {
return nil
}
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(&relations).
On("CONFLICT (entity_type, entity_id, tag_id) DO NOTHING").
Exec(ctx)
return err
}
func (s *store) DeleteRelationsExcept(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID, keepTagIDs []valuer.UUID) error {
q := s.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model((*tagtypes.TagRelation)(nil)).
Where("entity_type = ?", entityType).
Where("entity_id = ?", entityID)
if len(keepTagIDs) > 0 {
q = q.Where("tag_id NOT IN (?)", bun.In(keepTagIDs))
}
_, err := q.Exec(ctx)
return err
}

View File

@@ -1,146 +0,0 @@
package impltag
import (
"context"
"path/filepath"
"strings"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore"
"github.com/SigNoz/signoz/pkg/types/tagtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uptrace/bun"
)
func newTestStore(t *testing.T) sqlstore.SQLStore {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := sqlitesqlstore.New(context.Background(), factorytest.NewSettings(), sqlstore.Config{
Provider: "sqlite",
Connection: sqlstore.ConnectionConfig{
MaxOpenConns: 1,
MaxConnLifetime: 0,
},
Sqlite: sqlstore.SqliteConfig{
Path: dbPath,
Mode: "wal",
BusyTimeout: 5 * time.Second,
TransactionMode: "deferred",
},
})
require.NoError(t, err)
_, err = store.BunDB().NewCreateTable().
Model((*tagtypes.Tag)(nil)).
IfNotExists().
Exec(context.Background())
require.NoError(t, err)
_, err = store.BunDB().Exec(`CREATE UNIQUE INDEX IF NOT EXISTS uq_tag_org_entity_lower_key_lower_value ON tag (org_id, entity_type, LOWER(key), LOWER(value))`)
require.NoError(t, err)
return store
}
var dashboardEntityType = tagtypes.MustNewEntityType("dashboard")
func tagsByLowerKeyValue(t *testing.T, db *bun.DB) map[string]*tagtypes.Tag {
t.Helper()
all := make([]*tagtypes.Tag, 0)
require.NoError(t, db.NewSelect().Model(&all).Scan(context.Background()))
out := map[string]*tagtypes.Tag{}
for _, tag := range all {
out[strings.ToLower(tag.Key)+"\x00"+strings.ToLower(tag.Value)] = tag
}
return out
}
func TestStore_Create_PopulatesIDsOnFreshInsert(t *testing.T) {
ctx := context.Background()
sqlstore := newTestStore(t)
s := NewStore(sqlstore)
orgID := valuer.GenerateUUID()
tagA := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "u@signoz.io")
tagB := tagtypes.NewTag(orgID, dashboardEntityType, "team", "BLR", "u@signoz.io")
preIDA := tagA.ID
preIDB := tagB.ID
got, err := s.Create(ctx, []*tagtypes.Tag{tagA, tagB})
require.NoError(t, err)
require.Len(t, got, 2)
// No race → pre-generated IDs stand. The slice is what we passed in,
// confirming Scan didn't reallocate.
assert.Equal(t, preIDA, got[0].ID)
assert.Equal(t, preIDB, got[1].ID)
// And the rows are in the DB.
stored := tagsByLowerKeyValue(t, sqlstore.BunDB())
require.Contains(t, stored, "tag\x00database")
require.Contains(t, stored, "team\x00blr")
assert.Equal(t, preIDA, stored["tag\x00database"].ID)
assert.Equal(t, preIDB, stored["team\x00blr"].ID)
}
func TestStore_Create_ConflictReturnsExistingRowID(t *testing.T) {
ctx := context.Background()
sqlstore := newTestStore(t)
s := NewStore(sqlstore)
orgID := valuer.GenerateUUID()
// Simulate a concurrent insert: someone else has already inserted "tag:Database".
winner := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "concurrent")
_, err := s.Create(ctx, []*tagtypes.Tag{winner})
require.NoError(t, err)
winnerID := winner.ID
// Now our request runs with a different pre-generated ID for the same
// (key, value) — case differs but the functional unique index collapses
// them. RETURNING should overwrite our stale ID with winner's ID.
loser := tagtypes.NewTag(orgID, dashboardEntityType, "TAG", "DATABASE", "u@signoz.io")
loserPreID := loser.ID
require.NotEqual(t, winnerID, loserPreID, "pre-generated IDs must differ for this test to be meaningful")
got, err := s.Create(ctx, []*tagtypes.Tag{loser})
require.NoError(t, err)
require.Len(t, got, 1)
assert.Equal(t, winnerID, got[0].ID, "returned slice should carry the existing row's ID, not our stale one")
assert.Equal(t, winnerID, loser.ID, "input slice element is mutated in place")
// And the DB still has exactly one row for that (lower(key), lower(value)) — winner's, with winner's casing.
stored := tagsByLowerKeyValue(t, sqlstore.BunDB())
require.Len(t, stored, 1)
assert.Equal(t, winnerID, stored["tag\x00database"].ID)
assert.Equal(t, "tag", stored["tag\x00database"].Key, "winner's casing preserved in key")
assert.Equal(t, "Database", stored["tag\x00database"].Value, "winner's casing preserved in value")
}
func TestStore_Create_MixedFreshAndConflict(t *testing.T) {
ctx := context.Background()
sqlstore := newTestStore(t)
s := NewStore(sqlstore)
orgID := valuer.GenerateUUID()
pre := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "concurrent")
_, err := s.Create(ctx, []*tagtypes.Tag{pre})
require.NoError(t, err)
preExistingID := pre.ID
conflict := tagtypes.NewTag(orgID, dashboardEntityType, "tag", "Database", "u@signoz.io")
fresh := tagtypes.NewTag(orgID, dashboardEntityType, "team", "BLR", "u@signoz.io")
freshPreID := fresh.ID
got, err := s.Create(ctx, []*tagtypes.Tag{conflict, fresh})
require.NoError(t, err)
require.Len(t, got, 2)
assert.Equal(t, preExistingID, got[0].ID, "conflicting row's ID overwritten with the existing row's")
assert.Equal(t, freshPreID, got[1].ID, "fresh row's pre-generated ID is preserved")
}

View File

@@ -1,24 +0,0 @@
package tag
import (
"context"
"github.com/SigNoz/signoz/pkg/types/tagtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
// Does not link the resolved tags to any entity — call LinkToEntity for that.
CreateMany(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, postable []tagtypes.PostableTag, createdBy string) ([]*tagtypes.Tag, error)
// Existing rows are left untouched.
LinkToEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error
// missing links are inserted, obsolete ones removed.
SyncLinksForEntity(ctx context.Context, orgID valuer.UUID, entityType tagtypes.EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) error
ListForEntity(ctx context.Context, entityType tagtypes.EntityType, entityID valuer.UUID) ([]*tagtypes.Tag, error)
// Entities with no tags are absent from the returned map.
ListForEntities(ctx context.Context, entityType tagtypes.EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*tagtypes.Tag, error)
}

View File

@@ -1343,7 +1343,7 @@ func getLocalTableName(tableName string) string {
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1434,6 +1434,10 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
UserAuditable: types.UserAuditable{
CreatedBy: userID,
UpdatedBy: userID,
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
@@ -1511,7 +1515,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1572,6 +1576,10 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
UserAuditable: types.UserAuditable{
CreatedBy: userID,
UpdatedBy: userID,
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
@@ -1687,7 +1695,7 @@ func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool,
return true, nil
}
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, userID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
@@ -1718,7 +1726,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
ttlParams.ToColdStorageDuration = 0
}
ttlResult, apiErr := r.SetTTL(ctx, orgID, ttlParams)
ttlResult, apiErr := r.SetTTL(ctx, orgID, userID, ttlParams)
if apiErr != nil {
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
}
@@ -1847,6 +1855,10 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
UserAuditable: types.UserAuditable{
CreatedBy: userID,
UpdatedBy: userID,
},
TransactionID: uuid,
TableName: tableName,
TTL: params.DefaultTTLDays,
@@ -2185,24 +2197,24 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, orgID, 100)
switch params.Type {
case constants.TraceTTL:
return r.setTTLTraces(ctx, orgID, params)
return r.setTTLTraces(ctx, orgID, userID, params)
case constants.MetricsTTL:
return r.setTTLMetrics(ctx, orgID, params)
return r.setTTLMetrics(ctx, orgID, userID, params)
case constants.LogsTTL:
return r.setTTLLogs(ctx, orgID, params)
return r.setTTLLogs(ctx, orgID, userID, params)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
}
}
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, userID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -2244,6 +2256,10 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
UserAuditable: types.UserAuditable{
CreatedBy: userID,
UpdatedBy: userID,
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),

View File

@@ -1655,7 +1655,7 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
}
// Context is not used here as TTL is long duration DB operation
result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, ttlParams)
result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, claims.UserID, ttlParams)
if apiErr != nil {
if apiErr.Typ == model.ErrorConflict {
aH.HandleError(w, apiErr.Err, http.StatusConflict)
@@ -1684,7 +1684,7 @@ func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Reque
}
// Context is not used here as TTL is long duration DB operation
result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, &params)
result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, claims.UserID, &params)
if apiErr != nil {
render.Error(w, errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error()))
return

View File

@@ -46,8 +46,8 @@ type Reader interface {
GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error)
// Setter Interfaces
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
SetTTL(ctx context.Context, orgID string, userID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, userID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -45,7 +44,6 @@ func TestNewHandlers(t *testing.T) {
emailing := emailingtest.New()
queryParser := queryparser.New(providerSettings)
require.NoError(t, err)
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
dashboardModule := impldashboard.NewModule(impldashboard.NewStore(sqlstore), providerSettings, nil, orgGetter, queryParser)
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
@@ -54,7 +52,7 @@ func TestNewHandlers(t *testing.T) {
userRoleStore := impluser.NewUserRoleStore(sqlstore, providerSettings)
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings), userRoleStore, flagger)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger, tagModule)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger)
querierHandler := querier.NewHandler(providerSettings, nil, nil)
registryHandler := factory.NewHandler(nil)

View File

@@ -40,7 +40,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile/implspanpercentile"
"github.com/SigNoz/signoz/pkg/modules/tag"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
"github.com/SigNoz/signoz/pkg/modules/tracedetail/impltracedetail"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
@@ -81,7 +80,6 @@ type Modules struct {
CloudIntegration cloudintegration.Module
RuleStateHistory rulestatehistory.Module
TraceDetail tracedetail.Module
Tag tag.Module
}
func NewModules(
@@ -106,7 +104,6 @@ func NewModules(
serviceAccount serviceaccount.Module,
cloudIntegrationModule cloudintegration.Module,
fl flagger.Flagger,
tagModule tag.Module,
) Modules {
quickfilter := implquickfilter.NewModule(implquickfilter.NewStore(sqlstore))
orgSetter := implorganization.NewSetter(implorganization.NewStore(sqlstore), alertmanager, quickfilter)
@@ -136,6 +133,5 @@ func NewModules(
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
CloudIntegration: cloudIntegrationModule,
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
Tag: tagModule,
}
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -46,7 +45,6 @@ func TestNewModules(t *testing.T) {
emailing := emailingtest.New()
queryParser := queryparser.New(providerSettings)
require.NoError(t, err)
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
dashboardModule := impldashboard.NewModule(impldashboard.NewStore(sqlstore), providerSettings, nil, orgGetter, queryParser)
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
@@ -58,7 +56,7 @@ func TestNewModules(t *testing.T) {
serviceAccount := implserviceaccount.NewModule(implserviceaccount.NewStore(sqlstore), nil, nil, nil, providerSettings, serviceaccount.Config{})
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger, tagModule)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger)
reflectVal := reflect.ValueOf(modules)
for i := 0; i < reflectVal.NumField(); i++ {

View File

@@ -196,7 +196,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
sqlmigration.NewAddServiceAccountManagedRoleTransactionsFactory(sqlstore),
sqlmigration.NewAddTagsFactory(sqlstore, sqlschema),
sqlmigration.NewUpdateTTLSettingUserAuditFactory(sqlstore, sqlschema),
)
}

View File

@@ -29,7 +29,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
"github.com/SigNoz/signoz/pkg/modules/tag/impltag"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -326,11 +325,6 @@ func New(
// Initialize query parser (needed for dashboard module)
queryParser := queryparser.New(providerSettings)
// Initialize tag module — shared across modules that link entities to tags
// (currently dashboard; future: alerts, RBAC). Built once here and injected
// where needed.
tagModule := impltag.NewModule(impltag.NewStore(sqlstore))
// Initialize dashboard module
dashboard := dashboardModuleCallback(sqlstore, providerSettings, analytics, orgGetter, queryParser, querier, licensing)
@@ -447,7 +441,7 @@ func New(
}
// Initialize all modules
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger, tagModule)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger)
// Initialize ruler from the variant-specific provider factories
rulerInstance, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.Ruler, rulerProviderFactories(cache, alertmanager, sqlstore, telemetrystore, telemetryMetadataStore, prometheus, orgGetter, modules.RuleStateHistory, querier, queryParser), "signoz")

View File

@@ -1,102 +0,0 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addTags struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewAddTagsFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_tags"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &addTags{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
})
}
func (migration *addTags) Register(migrations *migrate.Migrations) error {
return migrations.Register(migration.Up, migration.Down)
}
func (migration *addTags) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
sqls := [][]byte{}
tagTableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "tag",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "key", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "value", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "entity_type", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: true},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: true},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{ColumnNames: []sqlschema.ColumnName{"id"}},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, tagTableSQLs...)
// Functional unique index: case-insensitive uniqueness on (org_id, entity_type, key, value).
// sqlschema.UniqueIndex doesn't support expressions, so emit raw SQL — both
// Postgres and SQLite (modernc 3.50.x) support expression indexes.
sqls = append(sqls, []byte(`CREATE UNIQUE INDEX IF NOT EXISTS uq_tag_org_entity_lower_key_lower_value ON tag (org_id, entity_type, LOWER(key), LOWER(value))`))
tagRelationsTableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "tag_relations",
Columns: []*sqlschema.Column{
{Name: "entity_type", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "entity_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "tag_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{ColumnNames: []sqlschema.ColumnName{"entity_type", "entity_id", "tag_id"}},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, tagRelationsTableSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *addTags) Down(_ context.Context, _ *bun.DB) error {
return nil
}

View File

@@ -0,0 +1,84 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateTTLSettingUserAudit struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewUpdateTTLSettingUserAuditFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("update_ttl_setting_user_audit"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
return newUpdateTTLSettingUserAudit(ctx, providerSettings, config, sqlstore, sqlschema)
})
}
func newUpdateTTLSettingUserAudit(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
return &updateTTLSettingUserAudit{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
}
func (migration *updateTTLSettingUserAudit) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateTTLSettingUserAudit) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
table, uniqueConstraints, err := migration.sqlschema.GetTable(ctx, sqlschema.TableName("ttl_setting"))
if err != nil {
return err
}
columns := []*sqlschema.Column{
{
Name: sqlschema.ColumnName("created_by"),
DataType: sqlschema.DataTypeText,
Nullable: true,
},
{
Name: sqlschema.ColumnName("updated_by"),
DataType: sqlschema.DataTypeText,
Nullable: true,
},
}
for _, column := range columns {
sqls := migration.sqlschema.Operator().AddColumn(table, uniqueConstraints, column, nil)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *updateTTLSettingUserAudit) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -1,99 +0,0 @@
package inframonitoringtypes
import (
"encoding/json"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type Namespaces struct {
Type ResponseType `json:"type" required:"true"`
Records []NamespaceRecord `json:"records" required:"true"`
Total int `json:"total" required:"true"`
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
}
type NamespaceRecord struct {
NamespaceName string `json:"namespaceName" required:"true"`
NamespaceCPU float64 `json:"namespaceCPU" required:"true"`
NamespaceMemory float64 `json:"namespaceMemory" required:"true"`
PodCountsByPhase PodCountsByPhase `json:"podCountsByPhase" required:"true"`
Meta map[string]string `json:"meta" required:"true"`
}
// PostableNamespaces is the request body for the v2 namespaces list API.
type PostableNamespaces struct {
Start int64 `json:"start" required:"true"`
End int64 `json:"end" required:"true"`
Filter *qbtypes.Filter `json:"filter"`
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
OrderBy *qbtypes.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit" required:"true"`
}
// Validate ensures PostableNamespaces contains acceptable values.
func (req *PostableNamespaces) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
if req.Start <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid start time %d: start must be greater than 0",
req.Start,
)
}
if req.End <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid end time %d: end must be greater than 0",
req.End,
)
}
if req.Start >= req.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time range: start (%d) must be less than end (%d)",
req.Start,
req.End,
)
}
if req.Limit < 1 || req.Limit > 5000 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if req.Offset < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
}
if req.OrderBy != nil {
if !slices.Contains(NamespacesValidOrderByKeys, req.OrderBy.Key.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
}
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
}
}
return nil
}
// UnmarshalJSON validates input immediately after decoding.
func (req *PostableNamespaces) UnmarshalJSON(data []byte) error {
type raw PostableNamespaces
var decoded raw
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
*req = PostableNamespaces(decoded)
return req.Validate()
}

View File

@@ -1,11 +0,0 @@
package inframonitoringtypes
const (
NamespacesOrderByCPU = "cpu"
NamespacesOrderByMemory = "memory"
)
var NamespacesValidOrderByKeys = []string{
NamespacesOrderByCPU,
NamespacesOrderByMemory,
}

View File

@@ -1,237 +0,0 @@
package inframonitoringtypes
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestPostableNamespaces_Validate(t *testing.T) {
tests := []struct {
name string
req *PostableNamespaces
wantErr bool
}{
{
name: "valid request",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "nil request",
req: nil,
wantErr: true,
},
{
name: "start time zero",
req: &PostableNamespaces{
Start: 0,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time negative",
req: &PostableNamespaces{
Start: -1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "end time zero",
req: &PostableNamespaces{
Start: 1000,
End: 0,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time greater than end time",
req: &PostableNamespaces{
Start: 2000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time equal to end time",
req: &PostableNamespaces{
Start: 1000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "limit zero",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 0,
Offset: 0,
},
wantErr: true,
},
{
name: "limit negative",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: -10,
Offset: 0,
},
wantErr: true,
},
{
name: "limit exceeds max",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 5001,
Offset: 0,
},
wantErr: true,
},
{
name: "offset negative",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: -5,
},
wantErr: true,
},
{
name: "orderBy nil is valid",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "orderBy with valid key cpu and direction asc",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionAsc,
},
},
wantErr: false,
},
{
name: "orderBy with valid key memory and direction desc",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByMemory,
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: false,
},
{
name: "orderBy with pod_phase key is rejected",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "pod_phase",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with invalid key",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with valid key but invalid direction",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByMemory,
},
},
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.req.Validate()
if tt.wantErr {
require.Error(t, err)
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -9,6 +9,7 @@ type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_setting"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`

View File

@@ -1,27 +0,0 @@
package tagtypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
List(ctx context.Context, orgID valuer.UUID, entityType EntityType) ([]*Tag, error)
ListByEntity(ctx context.Context, entityType EntityType, entityID valuer.UUID) ([]*Tag, error)
ListByEntities(ctx context.Context, entityType EntityType, entityIDs []valuer.UUID) (map[valuer.UUID][]*Tag, error)
// Create upserts the given tags and returns them with authoritative IDs.
// On conflict on (org_id, entity_type, LOWER(key), LOWER(value)) — which
// happens only when a concurrent insert raced ours, including casing-only
// collisions — the returned entry carries the existing row's ID rather
// than the pre-generated one in the input.
Create(ctx context.Context, tags []*Tag) ([]*Tag, error)
// CreateRelations inserts tag-entity relations. Conflicts on the composite primary key are ignored.
CreateRelations(ctx context.Context, relations []*TagRelation) error
DeleteRelationsExcept(ctx context.Context, entityType EntityType, entityID valuer.UUID, keepTagIDs []valuer.UUID) error
}

View File

@@ -1,137 +0,0 @@
package tagtypes
import (
"context"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
var (
ErrCodeTagInvalidName = errors.MustNewCode("tag_invalid_name")
ErrCodeTagNotFound = errors.MustNewCode("tag_not_found")
)
type Tag struct {
bun.BaseModel `bun:"table:tag,alias:tag"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
Key string `json:"key" required:"true" bun:"key,type:text,notnull"`
Value string `json:"value" required:"true" bun:"value,type:text,notnull"`
OrgID valuer.UUID `json:"orgId" required:"true" bun:"org_id,type:text,notnull"`
EntityType EntityType `json:"entityType" required:"true" bun:"entity_type,type:text,notnull"`
}
type PostableTag struct {
Key string `json:"key" required:"true"`
Value string `json:"value" required:"true"`
}
type GettableTag = PostableTag
func NewGettableTagFromTag(tag *Tag) *GettableTag {
return &GettableTag{Key: tag.Key, Value: tag.Value}
}
func NewGettableTagsFromTags(tags []*Tag) []*GettableTag {
out := make([]*GettableTag, len(tags))
for i, t := range tags {
out[i] = NewGettableTagFromTag(t)
}
return out
}
func NewTag(orgID valuer.UUID, entityType EntityType, key, value, createdBy string) *Tag {
now := time.Now()
return &Tag{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{
CreatedAt: now,
UpdatedAt: now,
},
UserAuditable: types.UserAuditable{
CreatedBy: createdBy,
UpdatedBy: createdBy,
},
Key: key,
Value: value,
OrgID: orgID,
EntityType: entityType,
}
}
// Resolve canonicalizes a batch of user-supplied (key, value) tag pairs against
// the existing tags for an org. Lookup is case-insensitive on both key and
// value (matching the storage uniqueness rule); when an existing row matches,
// its display casing is reused. Inputs are deduped on (LOWER(key), LOWER(value));
// the first input's casing wins on collisions. Returns:
// - toCreate: new Tag rows the caller should insert (with pre-generated IDs)
// - matched: existing rows the caller's input already pointed to. They
// already carry authoritative IDs from the store.
func Resolve(ctx context.Context, store Store, orgID valuer.UUID, entityType EntityType, postable []PostableTag, createdBy string) ([]*Tag, []*Tag, error) {
if len(postable) == 0 {
return nil, nil, nil
}
existing, err := store.List(ctx, orgID, entityType)
if err != nil {
return nil, nil, err
}
lowercaseTagsMap := make(map[string]*Tag, len(existing))
for _, t := range existing {
mapKey := strings.ToLower(t.Key) + "\x00" + strings.ToLower(t.Value)
lowercaseTagsMap[mapKey] = t
}
seenInRequestAlready := make(map[string]struct{}, len(postable)) // postable can have the same tag multiple times
toCreate := make([]*Tag, 0)
matched := make([]*Tag, 0)
for _, p := range postable {
key, value, err := validatePostableTag(p)
if err != nil {
return nil, nil, err
}
lookup := strings.ToLower(key) + "\x00" + strings.ToLower(value)
if _, dup := seenInRequestAlready[lookup]; dup {
continue
}
seenInRequestAlready[lookup] = struct{}{}
if existingTag, ok := lowercaseTagsMap[lookup]; ok {
matched = append(matched, existingTag)
continue
}
toCreate = append(toCreate, NewTag(orgID, entityType, key, value, createdBy))
}
return toCreate, matched, nil
}
// Entity-specific reserved-key checks (e.g. dashboard column names that would
// collide with the list-query DSL) are the caller's responsibility — perform
// them before calling into the tag module.
func validatePostableTag(p PostableTag) (string, string, error) {
key := strings.TrimSpace(p.Key)
value := strings.TrimSpace(p.Value)
if key == "" {
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag key cannot be empty")
}
if value == "" {
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag value cannot be empty")
}
if strings.ContainsRune(key, '/') {
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag key %q cannot contain '/'", key)
}
if strings.ContainsRune(value, '/') {
return "", "", errors.Newf(errors.TypeInvalidInput, ErrCodeTagInvalidName, "tag value %q cannot contain '/'", value)
}
return key, value, nil
}

View File

@@ -1,38 +0,0 @@
package tagtypes
import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
type EntityType struct{ valuer.String }
func MustNewEntityType(name string) EntityType {
return EntityType{valuer.NewString(name)}
}
type TagRelation struct {
bun.BaseModel `bun:"table:tag_relations,alias:tag_relations"`
EntityType EntityType `json:"entityType" required:"true" bun:"entity_type,type:text,notnull"`
EntityID valuer.UUID `json:"entityId" required:"true" bun:"entity_id,pk,type:text,notnull"`
TagID valuer.UUID `json:"tagId" required:"true" bun:"tag_id,pk,type:text,notnull"`
OrgID valuer.UUID `json:"orgId" required:"true" bun:"org_id,type:text,notnull"`
}
func NewTagRelation(orgID valuer.UUID, entityType EntityType, entityID valuer.UUID, tagID valuer.UUID) *TagRelation {
return &TagRelation{
EntityType: entityType,
EntityID: entityID,
TagID: tagID,
OrgID: orgID,
}
}
func NewTagRelations(orgID valuer.UUID, entityType EntityType, entityID valuer.UUID, tagIDs []valuer.UUID) []*TagRelation {
relations := make([]*TagRelation, 0, len(tagIDs))
for _, tagID := range tagIDs {
relations = append(relations, NewTagRelation(orgID, entityType, entityID, tagID))
}
return relations
}

View File

@@ -1,166 +0,0 @@
package tagtypes
import (
"context"
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestValidatePostableTag(t *testing.T) {
tests := []struct {
name string
input PostableTag
wantKey string
wantValue string
wantError bool
}{
{name: "simple pair", input: PostableTag{Key: "team", Value: "pulse"}, wantKey: "team", wantValue: "pulse"},
{name: "preserves casing", input: PostableTag{Key: "Team", Value: "Pulse"}, wantKey: "Team", wantValue: "Pulse"},
{name: "trims key", input: PostableTag{Key: " team ", Value: "pulse"}, wantKey: "team", wantValue: "pulse"},
{name: "trims value", input: PostableTag{Key: "team", Value: " pulse "}, wantKey: "team", wantValue: "pulse"},
{name: "empty key rejected", input: PostableTag{Key: "", Value: "pulse"}, wantError: true},
{name: "empty value rejected", input: PostableTag{Key: "team", Value: ""}, wantError: true},
{name: "whitespace-only key rejected", input: PostableTag{Key: " ", Value: "pulse"}, wantError: true},
{name: "whitespace-only value rejected", input: PostableTag{Key: "team", Value: " "}, wantError: true},
{name: "slash in key rejected", input: PostableTag{Key: "team/eng", Value: "pulse"}, wantError: true},
{name: "slash in value rejected", input: PostableTag{Key: "team", Value: "pulse/events"}, wantError: true},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotKey, gotValue, err := validatePostableTag(tc.input)
if tc.wantError {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tc.wantKey, gotKey)
assert.Equal(t, tc.wantValue, gotValue)
})
}
}
var testEntityType = MustNewEntityType("dashboard")
type fakeStore struct {
tags []*Tag
listCallCount int
}
func (f *fakeStore) List(_ context.Context, _ valuer.UUID, _ EntityType) ([]*Tag, error) {
f.listCallCount++
out := make([]*Tag, len(f.tags))
copy(out, f.tags)
return out, nil
}
func (f *fakeStore) Create(_ context.Context, tags []*Tag) ([]*Tag, error) {
return tags, nil
}
func (f *fakeStore) CreateRelations(_ context.Context, _ []*TagRelation) error {
return nil
}
func (f *fakeStore) ListByEntity(_ context.Context, _ EntityType, _ valuer.UUID) ([]*Tag, error) {
return []*Tag{}, nil
}
func (f *fakeStore) ListByEntities(_ context.Context, _ EntityType, _ []valuer.UUID) (map[valuer.UUID][]*Tag, error) {
return map[valuer.UUID][]*Tag{}, nil
}
func (f *fakeStore) DeleteRelationsExcept(_ context.Context, _ EntityType, _ valuer.UUID, _ []valuer.UUID) error {
return nil
}
func TestResolve(t *testing.T) {
t.Run("empty input does not hit store", func(t *testing.T) {
store := &fakeStore{}
toCreate, matched, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, nil, "u@signoz.io")
require.NoError(t, err)
assert.Empty(t, toCreate)
assert.Empty(t, matched)
assert.Zero(t, store.listCallCount, "should not hit store when input is empty")
})
t.Run("creates missing pairs and reuses existing", func(t *testing.T) {
orgID := valuer.GenerateUUID()
dbTag := NewTag(orgID, testEntityType, "team", "Pulse", "seed")
dbTag2 := NewTag(orgID, testEntityType, "Database", "redis", "seed")
store := &fakeStore{tags: []*Tag{dbTag, dbTag2}}
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
{Key: "team", Value: "events"}, // new
{Key: "DATABASE", Value: "REDIS"}, // case-only conflict
{Key: "Brand", Value: "New"}, // new
}, "u@signoz.io")
require.NoError(t, err)
createdLowerKVs := []string{}
for _, tg := range toCreate {
createdLowerKVs = append(createdLowerKVs, strings.ToLower(tg.Key)+"\x00"+strings.ToLower(tg.Value))
}
assert.ElementsMatch(t, []string{"team\x00events", "brand\x00new"}, createdLowerKVs,
"only the two missing pairs should be returned for insertion")
require.Len(t, matched, 1, "DATABASE:REDIS should hit the existing 'Database:redis' tag")
assert.Same(t, dbTag2, matched[0], "matched should return the existing pointer with its authoritative ID")
})
t.Run("dedupes inputs that map to the same lower(key)+lower(value)", func(t *testing.T) {
orgID := valuer.GenerateUUID()
store := &fakeStore{}
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
{Key: "Foo", Value: "Bar"},
{Key: "foo", Value: "bar"},
{Key: "FOO", Value: "BAR"},
}, "u@signoz.io")
require.NoError(t, err)
require.Empty(t, matched)
require.Len(t, toCreate, 1, "duplicate inputs must collapse into a single insert")
assert.Equal(t, "Foo", toCreate[0].Key, "first input's casing wins")
assert.Equal(t, "Bar", toCreate[0].Value, "first input's casing wins")
})
t.Run("preserves existing casing on case-only match", func(t *testing.T) {
orgID := valuer.GenerateUUID()
dbTag := NewTag(orgID, testEntityType, "Team", "Pulse", "seed")
store := &fakeStore{tags: []*Tag{dbTag}}
toCreate, matched, err := Resolve(context.Background(), store, orgID, testEntityType, []PostableTag{
{Key: "team", Value: "PULSE"},
}, "u@signoz.io")
require.NoError(t, err)
assert.Empty(t, toCreate)
require.Len(t, matched, 1)
assert.Equal(t, "Team", matched[0].Key)
assert.Equal(t, "Pulse", matched[0].Value)
})
t.Run("propagates validation error from any input", func(t *testing.T) {
store := &fakeStore{}
_, _, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, []PostableTag{
{Key: "team", Value: "pulse"},
{Key: "", Value: "x"},
}, "u@signoz.io")
require.Error(t, err)
})
t.Run("propagates slash validation error", func(t *testing.T) {
store := &fakeStore{}
_, _, err := Resolve(context.Background(), store, valuer.GenerateUUID(), testEntityType, []PostableTag{
{Key: "team/eng", Value: "pulse"},
}, "u@signoz.io")
require.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "/"), "error should reference the disallowed character")
})
}