mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-10 22:20:20 +01:00
Compare commits
42 Commits
debug-wal
...
feat/json-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9176ef0589 | ||
|
|
5c2a338189 | ||
|
|
7279c5f770 | ||
|
|
e543776efc | ||
|
|
704bab23cf | ||
|
|
371da26b3c | ||
|
|
621127b7fb | ||
|
|
97fbfbdc13 | ||
|
|
4b112988ef | ||
|
|
a47ecf3907 | ||
|
|
e4a78cf556 | ||
|
|
b6adecc294 | ||
|
|
40333a5fee | ||
|
|
4af6a9abae | ||
|
|
55e892dad3 | ||
|
|
181116308f | ||
|
|
eaa678910b | ||
|
|
e994caeb02 | ||
|
|
10840f8495 | ||
|
|
1fcd3adfc8 | ||
|
|
3e14b26b00 | ||
|
|
b30bfa6371 | ||
|
|
e7f4a04b36 | ||
|
|
0687634da3 | ||
|
|
7e7732243e | ||
|
|
2f952e402f | ||
|
|
a12febca4a | ||
|
|
cb71c9c3f7 | ||
|
|
1cd4ce6509 | ||
|
|
9299c8ab18 | ||
|
|
24749de269 | ||
|
|
39098ec3f4 | ||
|
|
fe554f5c94 | ||
|
|
8a60a041a6 | ||
|
|
541f19c34a | ||
|
|
010db03d6e | ||
|
|
5408acbd8c | ||
|
|
0de6c85f81 | ||
|
|
69ec24fa05 | ||
|
|
539d732b65 | ||
|
|
843d5fb199 | ||
|
|
fabdfb8cc1 |
3
.github/workflows/integrationci.yaml
vendored
3
.github/workflows/integrationci.yaml
vendored
@@ -52,6 +52,7 @@ jobs:
|
||||
- ingestionkeys
|
||||
- rootuser
|
||||
- serviceaccount
|
||||
- querier_json_body
|
||||
sqlstore-provider:
|
||||
- postgres
|
||||
- sqlite
|
||||
@@ -61,7 +62,7 @@ jobs:
|
||||
- 25.5.6
|
||||
- 25.12.5
|
||||
schema-migrator-version:
|
||||
- v0.142.0
|
||||
- v0.144.2
|
||||
postgres-version:
|
||||
- 15
|
||||
if: |
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/cmd"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/authn"
|
||||
"github.com/SigNoz/signoz/pkg/authz"
|
||||
"github.com/SigNoz/signoz/pkg/authz/openfgaauthz"
|
||||
@@ -93,6 +94,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(_ licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config] {
|
||||
return noopgateway.NewProviderFactory()
|
||||
},
|
||||
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return signoz.NewAuditorProviderFactories()
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
return querier.NewHandler(ps, q, a)
|
||||
},
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/SigNoz/signoz/cmd"
|
||||
"github.com/SigNoz/signoz/ee/auditor/otlphttpauditor"
|
||||
"github.com/SigNoz/signoz/ee/authn/callbackauthn/oidccallbackauthn"
|
||||
"github.com/SigNoz/signoz/ee/authn/callbackauthn/samlcallbackauthn"
|
||||
"github.com/SigNoz/signoz/ee/authz/openfgaauthz"
|
||||
@@ -24,6 +25,7 @@ import (
|
||||
enterprisezeus "github.com/SigNoz/signoz/ee/zeus"
|
||||
"github.com/SigNoz/signoz/ee/zeus/httpzeus"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/authn"
|
||||
"github.com/SigNoz/signoz/pkg/authz"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -133,6 +135,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(licensing licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config] {
|
||||
return httpgateway.NewProviderFactory(licensing)
|
||||
},
|
||||
func(licensing licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
factories := signoz.NewAuditorProviderFactories()
|
||||
if err := factories.Add(otlphttpauditor.NewFactory(licensing, version.Info)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
communityHandler := querier.NewHandler(ps, q, a)
|
||||
return eequerier.NewHandler(ps, q, communityHandler)
|
||||
|
||||
@@ -364,3 +364,34 @@ serviceaccount:
|
||||
analytics:
|
||||
# toggle service account analytics
|
||||
enabled: true
|
||||
|
||||
##################### Auditor #####################
|
||||
auditor:
|
||||
# Specifies the auditor provider to use.
|
||||
# noop: discards all audit events (community default).
|
||||
# otlphttp: exports audit events via OTLP HTTP (enterprise).
|
||||
provider: noop
|
||||
# The async channel capacity for audit events. Events are dropped when full (fail-open).
|
||||
buffer_size: 1000
|
||||
# The maximum number of events per export batch.
|
||||
batch_size: 100
|
||||
# The maximum time between export flushes.
|
||||
flush_interval: 1s
|
||||
otlphttp:
|
||||
# The target scheme://host:port/path of the OTLP HTTP endpoint.
|
||||
endpoint: http://localhost:4318/v1/logs
|
||||
# Whether to use HTTP instead of HTTPS.
|
||||
insecure: false
|
||||
# The maximum duration for an export attempt.
|
||||
timeout: 10s
|
||||
# Additional HTTP headers sent with every export request.
|
||||
headers: {}
|
||||
retry:
|
||||
# Whether to retry on transient failures.
|
||||
enabled: true
|
||||
# The initial wait time before the first retry.
|
||||
initial_interval: 5s
|
||||
# The upper bound on backoff interval.
|
||||
max_interval: 30s
|
||||
# The total maximum time spent retrying.
|
||||
max_elapsed_time: 60s
|
||||
|
||||
@@ -3309,7 +3309,7 @@ paths:
|
||||
schema:
|
||||
$ref: '#/components/schemas/CloudintegrationtypesPostableAccount'
|
||||
responses:
|
||||
"200":
|
||||
"201":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
@@ -3322,7 +3322,7 @@ paths:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
description: Created
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
@@ -3683,6 +3683,11 @@ paths:
|
||||
provider
|
||||
operationId: ListServicesMetadata
|
||||
parameters:
|
||||
- in: query
|
||||
name: cloud_integration_id
|
||||
required: false
|
||||
schema:
|
||||
type: string
|
||||
- in: path
|
||||
name: cloud_provider
|
||||
required: true
|
||||
@@ -3735,6 +3740,11 @@ paths:
|
||||
description: This endpoint gets a service for the specified cloud provider
|
||||
operationId: GetService
|
||||
parameters:
|
||||
- in: query
|
||||
name: cloud_integration_id
|
||||
required: false
|
||||
schema:
|
||||
type: string
|
||||
- in: path
|
||||
name: cloud_provider
|
||||
required: true
|
||||
|
||||
@@ -227,7 +227,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
s.config.APIServer.Timeout.Default,
|
||||
s.config.APIServer.Timeout.Max,
|
||||
).Wrap)
|
||||
r.Use(middleware.NewAudit(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes, nil).Wrap)
|
||||
r.Use(middleware.NewAudit(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes, s.signoz.Auditor).Wrap)
|
||||
r.Use(middleware.NewComment().Wrap)
|
||||
|
||||
apiHandler.RegisterRoutes(r, am)
|
||||
|
||||
@@ -28,7 +28,7 @@ import type {
|
||||
CloudintegrationtypesPostableAgentCheckInDTO,
|
||||
CloudintegrationtypesUpdatableAccountDTO,
|
||||
CloudintegrationtypesUpdatableServiceDTO,
|
||||
CreateAccount200,
|
||||
CreateAccount201,
|
||||
CreateAccountPathParameters,
|
||||
DisconnectAccountPathParameters,
|
||||
GetAccount200,
|
||||
@@ -36,10 +36,12 @@ import type {
|
||||
GetConnectionCredentials200,
|
||||
GetConnectionCredentialsPathParameters,
|
||||
GetService200,
|
||||
GetServiceParams,
|
||||
GetServicePathParameters,
|
||||
ListAccounts200,
|
||||
ListAccountsPathParameters,
|
||||
ListServicesMetadata200,
|
||||
ListServicesMetadataParams,
|
||||
ListServicesMetadataPathParameters,
|
||||
RenderErrorResponseDTO,
|
||||
UpdateAccountPathParameters,
|
||||
@@ -260,7 +262,7 @@ export const createAccount = (
|
||||
cloudintegrationtypesPostableAccountDTO: BodyType<CloudintegrationtypesPostableAccountDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<CreateAccount200>({
|
||||
return GeneratedAPIInstance<CreateAccount201>({
|
||||
url: `/api/v1/cloud_integrations/${cloudProvider}/accounts`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
@@ -940,19 +942,25 @@ export const invalidateGetConnectionCredentials = async (
|
||||
*/
|
||||
export const listServicesMetadata = (
|
||||
{ cloudProvider }: ListServicesMetadataPathParameters,
|
||||
params?: ListServicesMetadataParams,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<ListServicesMetadata200>({
|
||||
url: `/api/v1/cloud_integrations/${cloudProvider}/services`,
|
||||
method: 'GET',
|
||||
params,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getListServicesMetadataQueryKey = ({
|
||||
cloudProvider,
|
||||
}: ListServicesMetadataPathParameters) => {
|
||||
return [`/api/v1/cloud_integrations/${cloudProvider}/services`] as const;
|
||||
export const getListServicesMetadataQueryKey = (
|
||||
{ cloudProvider }: ListServicesMetadataPathParameters,
|
||||
params?: ListServicesMetadataParams,
|
||||
) => {
|
||||
return [
|
||||
`/api/v1/cloud_integrations/${cloudProvider}/services`,
|
||||
...(params ? [params] : []),
|
||||
] as const;
|
||||
};
|
||||
|
||||
export const getListServicesMetadataQueryOptions = <
|
||||
@@ -960,6 +968,7 @@ export const getListServicesMetadataQueryOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>
|
||||
>(
|
||||
{ cloudProvider }: ListServicesMetadataPathParameters,
|
||||
params?: ListServicesMetadataParams,
|
||||
options?: {
|
||||
query?: UseQueryOptions<
|
||||
Awaited<ReturnType<typeof listServicesMetadata>>,
|
||||
@@ -971,11 +980,12 @@ export const getListServicesMetadataQueryOptions = <
|
||||
const { query: queryOptions } = options ?? {};
|
||||
|
||||
const queryKey =
|
||||
queryOptions?.queryKey ?? getListServicesMetadataQueryKey({ cloudProvider });
|
||||
queryOptions?.queryKey ??
|
||||
getListServicesMetadataQueryKey({ cloudProvider }, params);
|
||||
|
||||
const queryFn: QueryFunction<
|
||||
Awaited<ReturnType<typeof listServicesMetadata>>
|
||||
> = ({ signal }) => listServicesMetadata({ cloudProvider }, signal);
|
||||
> = ({ signal }) => listServicesMetadata({ cloudProvider }, params, signal);
|
||||
|
||||
return {
|
||||
queryKey,
|
||||
@@ -1003,6 +1013,7 @@ export function useListServicesMetadata<
|
||||
TError = ErrorType<RenderErrorResponseDTO>
|
||||
>(
|
||||
{ cloudProvider }: ListServicesMetadataPathParameters,
|
||||
params?: ListServicesMetadataParams,
|
||||
options?: {
|
||||
query?: UseQueryOptions<
|
||||
Awaited<ReturnType<typeof listServicesMetadata>>,
|
||||
@@ -1013,6 +1024,7 @@ export function useListServicesMetadata<
|
||||
): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
|
||||
const queryOptions = getListServicesMetadataQueryOptions(
|
||||
{ cloudProvider },
|
||||
params,
|
||||
options,
|
||||
);
|
||||
|
||||
@@ -1031,10 +1043,11 @@ export function useListServicesMetadata<
|
||||
export const invalidateListServicesMetadata = async (
|
||||
queryClient: QueryClient,
|
||||
{ cloudProvider }: ListServicesMetadataPathParameters,
|
||||
params?: ListServicesMetadataParams,
|
||||
options?: InvalidateOptions,
|
||||
): Promise<QueryClient> => {
|
||||
await queryClient.invalidateQueries(
|
||||
{ queryKey: getListServicesMetadataQueryKey({ cloudProvider }) },
|
||||
{ queryKey: getListServicesMetadataQueryKey({ cloudProvider }, params) },
|
||||
options,
|
||||
);
|
||||
|
||||
@@ -1047,21 +1060,24 @@ export const invalidateListServicesMetadata = async (
|
||||
*/
|
||||
export const getService = (
|
||||
{ cloudProvider, serviceId }: GetServicePathParameters,
|
||||
params?: GetServiceParams,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<GetService200>({
|
||||
url: `/api/v1/cloud_integrations/${cloudProvider}/services/${serviceId}`,
|
||||
method: 'GET',
|
||||
params,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getGetServiceQueryKey = ({
|
||||
cloudProvider,
|
||||
serviceId,
|
||||
}: GetServicePathParameters) => {
|
||||
export const getGetServiceQueryKey = (
|
||||
{ cloudProvider, serviceId }: GetServicePathParameters,
|
||||
params?: GetServiceParams,
|
||||
) => {
|
||||
return [
|
||||
`/api/v1/cloud_integrations/${cloudProvider}/services/${serviceId}`,
|
||||
...(params ? [params] : []),
|
||||
] as const;
|
||||
};
|
||||
|
||||
@@ -1070,6 +1086,7 @@ export const getGetServiceQueryOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>
|
||||
>(
|
||||
{ cloudProvider, serviceId }: GetServicePathParameters,
|
||||
params?: GetServiceParams,
|
||||
options?: {
|
||||
query?: UseQueryOptions<
|
||||
Awaited<ReturnType<typeof getService>>,
|
||||
@@ -1081,11 +1098,12 @@ export const getGetServiceQueryOptions = <
|
||||
const { query: queryOptions } = options ?? {};
|
||||
|
||||
const queryKey =
|
||||
queryOptions?.queryKey ?? getGetServiceQueryKey({ cloudProvider, serviceId });
|
||||
queryOptions?.queryKey ??
|
||||
getGetServiceQueryKey({ cloudProvider, serviceId }, params);
|
||||
|
||||
const queryFn: QueryFunction<Awaited<ReturnType<typeof getService>>> = ({
|
||||
signal,
|
||||
}) => getService({ cloudProvider, serviceId }, signal);
|
||||
}) => getService({ cloudProvider, serviceId }, params, signal);
|
||||
|
||||
return {
|
||||
queryKey,
|
||||
@@ -1111,6 +1129,7 @@ export function useGetService<
|
||||
TError = ErrorType<RenderErrorResponseDTO>
|
||||
>(
|
||||
{ cloudProvider, serviceId }: GetServicePathParameters,
|
||||
params?: GetServiceParams,
|
||||
options?: {
|
||||
query?: UseQueryOptions<
|
||||
Awaited<ReturnType<typeof getService>>,
|
||||
@@ -1121,6 +1140,7 @@ export function useGetService<
|
||||
): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
|
||||
const queryOptions = getGetServiceQueryOptions(
|
||||
{ cloudProvider, serviceId },
|
||||
params,
|
||||
options,
|
||||
);
|
||||
|
||||
@@ -1139,10 +1159,11 @@ export function useGetService<
|
||||
export const invalidateGetService = async (
|
||||
queryClient: QueryClient,
|
||||
{ cloudProvider, serviceId }: GetServicePathParameters,
|
||||
params?: GetServiceParams,
|
||||
options?: InvalidateOptions,
|
||||
): Promise<QueryClient> => {
|
||||
await queryClient.invalidateQueries(
|
||||
{ queryKey: getGetServiceQueryKey({ cloudProvider, serviceId }) },
|
||||
{ queryKey: getGetServiceQueryKey({ cloudProvider, serviceId }, params) },
|
||||
options,
|
||||
);
|
||||
|
||||
|
||||
@@ -3589,7 +3589,7 @@ export type ListAccounts200 = {
|
||||
export type CreateAccountPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type CreateAccount200 = {
|
||||
export type CreateAccount201 = {
|
||||
data: CloudintegrationtypesGettableAccountWithConnectionArtifactDTO;
|
||||
/**
|
||||
* @type string
|
||||
@@ -3647,6 +3647,14 @@ export type GetConnectionCredentials200 = {
|
||||
export type ListServicesMetadataPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type ListServicesMetadataParams = {
|
||||
/**
|
||||
* @type string
|
||||
* @description undefined
|
||||
*/
|
||||
cloud_integration_id?: string;
|
||||
};
|
||||
|
||||
export type ListServicesMetadata200 = {
|
||||
data: CloudintegrationtypesGettableServicesMetadataDTO;
|
||||
/**
|
||||
@@ -3659,6 +3667,14 @@ export type GetServicePathParameters = {
|
||||
cloudProvider: string;
|
||||
serviceId: string;
|
||||
};
|
||||
export type GetServiceParams = {
|
||||
/**
|
||||
* @type string
|
||||
* @description undefined
|
||||
*/
|
||||
cloud_integration_id?: string;
|
||||
};
|
||||
|
||||
export type GetService200 = {
|
||||
data: CloudintegrationtypesServiceDTO;
|
||||
/**
|
||||
|
||||
@@ -677,6 +677,18 @@ function NewWidget({
|
||||
queryType: currentQuery.queryType,
|
||||
isNewPanel,
|
||||
dataSource: currentQuery?.builder?.queryData?.[0]?.dataSource,
|
||||
...(currentQuery.queryType === EQueryType.CLICKHOUSE && {
|
||||
clickhouseQueryCount: currentQuery.clickhouse_sql.length,
|
||||
clickhouseQueries: currentQuery.clickhouse_sql.map((q) => ({
|
||||
name: q.name,
|
||||
query: (q.query ?? '')
|
||||
.replace(/--[^\n]*/g, '') // strip line comments
|
||||
.replace(/\/\*[\s\S]*?\*\//g, '') // strip block comments
|
||||
.replace(/'(?:[^'\\]|\\.|'')*'/g, "'?'") // replace single-quoted strings (handles \' and '' escapes)
|
||||
.replace(/\b\d+(?:\.\d+)?(?:[eE][+-]?\d+)?\b/g, '?'), // replace numeric literals (int, float, scientific)
|
||||
disabled: q.disabled,
|
||||
})),
|
||||
}),
|
||||
});
|
||||
setSaveModal(true);
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
|
||||
@@ -41,7 +41,7 @@ func (provider *provider) addCloudIntegrationRoutes(router *mux.Router) error {
|
||||
RequestContentType: "application/json",
|
||||
Response: new(citypes.GettableAccountWithConnectionArtifact),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
SuccessStatusCode: http.StatusCreated,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
@@ -138,6 +138,7 @@ func (provider *provider) addCloudIntegrationRoutes(router *mux.Router) error {
|
||||
Summary: "List services metadata",
|
||||
Description: "This endpoint lists the services metadata for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestQuery: new(citypes.ListServicesMetadataParams),
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.GettableServicesMetadata),
|
||||
ResponseContentType: "application/json",
|
||||
@@ -158,6 +159,7 @@ func (provider *provider) addCloudIntegrationRoutes(router *mux.Router) error {
|
||||
Summary: "Get service",
|
||||
Description: "This endpoint gets a service for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestQuery: new(citypes.GetServiceParams),
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.Service),
|
||||
ResponseContentType: "application/json",
|
||||
|
||||
@@ -63,6 +63,7 @@ type RetryConfig struct {
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "noop",
|
||||
BufferSize: 1000,
|
||||
BatchSize: 100,
|
||||
FlushInterval: time.Second,
|
||||
|
||||
@@ -208,7 +208,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
s.config.APIServer.Timeout.Default,
|
||||
s.config.APIServer.Timeout.Max,
|
||||
).Wrap)
|
||||
r.Use(middleware.NewAudit(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes, nil).Wrap)
|
||||
r.Use(middleware.NewAudit(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes, s.signoz.Auditor).Wrap)
|
||||
r.Use(middleware.NewComment().Wrap)
|
||||
|
||||
am := middleware.NewAuthZ(s.signoz.Instrumentation.Logger(), s.signoz.Modules.OrgGetter, s.signoz.Authz)
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/apiserver"
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/config"
|
||||
"github.com/SigNoz/signoz/pkg/emailing"
|
||||
@@ -123,6 +124,9 @@ type Config struct {
|
||||
|
||||
// ServiceAccount config
|
||||
ServiceAccount serviceaccount.Config `mapstructure:"serviceaccount"`
|
||||
|
||||
// Auditor config
|
||||
Auditor auditor.Config `mapstructure:"auditor"`
|
||||
}
|
||||
|
||||
func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.ResolverConfig) (Config, error) {
|
||||
@@ -153,6 +157,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
user.NewConfigFactory(),
|
||||
identn.NewConfigFactory(),
|
||||
serviceaccount.NewConfigFactory(),
|
||||
auditor.NewConfigFactory(),
|
||||
}
|
||||
|
||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||
|
||||
@@ -3,6 +3,8 @@ package signoz
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/auditor/noopauditor"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager/rulebasednotification"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
@@ -312,6 +314,12 @@ func NewGlobalProviderFactories(identNConfig identn.Config) factory.NamedMap[fac
|
||||
)
|
||||
}
|
||||
|
||||
func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
noopauditor.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
configflagger.NewFactory(registry),
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
||||
"github.com/SigNoz/signoz/pkg/auditor"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager/nfroutingstore/sqlroutingstore"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/apiserver"
|
||||
@@ -75,6 +76,7 @@ type SigNoz struct {
|
||||
QueryParser queryparser.QueryParser
|
||||
Flagger flagger.Flagger
|
||||
Gateway gateway.Gateway
|
||||
Auditor auditor.Auditor
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -94,6 +96,7 @@ func New(
|
||||
authzCallback func(context.Context, sqlstore.SQLStore, licensing.Licensing, dashboard.Module) (factory.ProviderFactory[authz.AuthZ, authz.Config], error),
|
||||
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
|
||||
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
|
||||
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
|
||||
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
|
||||
) (*SigNoz, error) {
|
||||
// Initialize instrumentation
|
||||
@@ -371,6 +374,12 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize auditor from the variant-specific provider factories
|
||||
auditor, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.Auditor, auditorProviderFactories(licensing), config.Auditor.Provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize authns
|
||||
store := sqlauthnstore.NewStore(sqlstore)
|
||||
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
|
||||
@@ -470,6 +479,7 @@ func New(
|
||||
factory.NewNamedService(factory.MustNewName("tokenizer"), tokenizer),
|
||||
factory.NewNamedService(factory.MustNewName("authz"), authz),
|
||||
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
|
||||
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -516,5 +526,6 @@ func New(
|
||||
QueryParser: queryParser,
|
||||
Flagger: flagger,
|
||||
Gateway: gateway,
|
||||
Auditor: auditor,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -25,7 +23,6 @@ type provider struct {
|
||||
bundb *sqlstore.BunDB
|
||||
dialect *dialect
|
||||
formatter sqlstore.SQLFormatter
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(hookFactories ...factory.ProviderFactory[sqlstore.SQLStoreHook, sqlstore.Config]) factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config] {
|
||||
@@ -62,19 +59,13 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
|
||||
sqliteDialect := sqlitedialect.New()
|
||||
bunDB := sqlstore.NewBunDB(settings, sqldb, sqliteDialect, hooks)
|
||||
|
||||
done := make(chan struct{})
|
||||
p := &provider{
|
||||
return &provider{
|
||||
settings: settings,
|
||||
sqldb: sqldb,
|
||||
bundb: bunDB,
|
||||
dialect: new(dialect),
|
||||
formatter: newFormatter(bunDB.Dialect()),
|
||||
done: done,
|
||||
}
|
||||
go p.walDiagnosticLoop(config.Sqlite.Path)
|
||||
|
||||
return p, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) BunDB() *bun.DB {
|
||||
@@ -118,73 +109,3 @@ func (provider *provider) WrapAlreadyExistsErrf(err error, code errors.Code, for
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// walDiagnosticLoop periodically logs pool stats, WAL file size, and busy prepared statements
|
||||
// to help diagnose WAL checkpoint failures caused by permanent read locks.
|
||||
func (provider *provider) walDiagnosticLoop(dbPath string) {
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
logger := provider.settings.Logger()
|
||||
walPath := dbPath + "-wal"
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-provider.done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
// 1. Log pool stats (no SQL needed)
|
||||
stats := provider.sqldb.Stats()
|
||||
logger.Info("sqlite_pool_stats",
|
||||
slog.Int("max_open", stats.MaxOpenConnections),
|
||||
slog.Int("open", stats.OpenConnections),
|
||||
slog.Int("in_use", stats.InUse),
|
||||
slog.Int("idle", stats.Idle),
|
||||
slog.Int64("wait_count", stats.WaitCount),
|
||||
slog.String("wait_duration", stats.WaitDuration.String()),
|
||||
slog.Int64("max_idle_closed", stats.MaxIdleClosed),
|
||||
slog.Int64("max_idle_time_closed", stats.MaxIdleTimeClosed),
|
||||
slog.Int64("max_lifetime_closed", stats.MaxLifetimeClosed),
|
||||
)
|
||||
|
||||
// 2. Log WAL file size (no SQL needed)
|
||||
if info, err := os.Stat(walPath); err == nil {
|
||||
logger.Info("sqlite_wal_size",
|
||||
slog.Int64("bytes", info.Size()),
|
||||
slog.String("path", walPath),
|
||||
)
|
||||
}
|
||||
|
||||
// 3. Check for busy prepared statements on a single pool connection
|
||||
provider.checkBusyStatements(logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *provider) checkBusyStatements(logger *slog.Logger) {
|
||||
conn, err := provider.sqldb.Conn(context.Background())
|
||||
if err != nil {
|
||||
logger.Warn("sqlite_diag_conn_error", slog.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
rows, err := conn.QueryContext(context.Background(), "SELECT sql FROM sqlite_stmt WHERE busy")
|
||||
if err != nil {
|
||||
logger.Warn("sqlite_diag_query_error", slog.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var stmtSQL string
|
||||
if err := rows.Scan(&stmtSQL); err != nil {
|
||||
logger.Warn("sqlite_diag_scan_error", slog.String("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
logger.Warn("leaked_busy_statement", slog.String("sql", stmtSQL))
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
logger.Warn("sqlite_diag_rows_error", slog.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,10 @@ type GettableServicesMetadata struct {
|
||||
Services []*ServiceMetadata `json:"services" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type ListServicesMetadataParams struct {
|
||||
CloudIntegrationID valuer.UUID `query:"cloud_integration_id" required:"false"`
|
||||
}
|
||||
|
||||
// Service represents a cloud integration service with its definition,
|
||||
// cloud integration service is non nil only when the service entry exists in DB with ANY config (enabled or disabled).
|
||||
type Service struct {
|
||||
@@ -69,6 +73,10 @@ type Service struct {
|
||||
CloudIntegrationService *CloudIntegrationService `json:"cloudIntegrationService" required:"true" nullable:"true"`
|
||||
}
|
||||
|
||||
type GetServiceParams struct {
|
||||
CloudIntegrationID valuer.UUID `query:"cloud_integration_id" required:"false"`
|
||||
}
|
||||
|
||||
type UpdatableService struct {
|
||||
Config *ServiceConfig `json:"config" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ pytest_plugins = [
|
||||
"fixtures.notification_channel",
|
||||
"fixtures.alerts",
|
||||
"fixtures.cloudintegrations",
|
||||
"fixtures.jsontypeexporter",
|
||||
]
|
||||
|
||||
|
||||
|
||||
437
tests/integration/fixtures/jsontypeexporter.py
Normal file
437
tests/integration/fixtures/jsontypeexporter.py
Normal file
@@ -0,0 +1,437 @@
|
||||
"""
|
||||
Simpler version of jsontypeexporter for test fixtures.
|
||||
This exports JSON type metadata to the path_types table by parsing JSON bodies
|
||||
and extracting all paths with their types, similar to how the real jsontypeexporter works.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
from abc import ABC
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Union,
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
class JSONPathType(ABC):
|
||||
"""Represents a JSON path with its type information"""
|
||||
|
||||
path: str
|
||||
type: str
|
||||
last_seen: np.uint64
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
type: str, # pylint: disable=redefined-builtin
|
||||
last_seen: Optional[datetime.datetime] = None,
|
||||
) -> None:
|
||||
self.path = path
|
||||
self.type = type
|
||||
if last_seen is None:
|
||||
last_seen = datetime.datetime.now()
|
||||
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
"""Return path type data as numpy array for database insertion"""
|
||||
return np.array([self.path, self.type, self.last_seen])
|
||||
|
||||
|
||||
# Constants matching jsontypeexporter
|
||||
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
|
||||
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
|
||||
|
||||
|
||||
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
|
||||
"""
|
||||
Infer array type from a list of pre-classified type strings.
|
||||
Matches jsontypeexporter's inferArrayMask logic (v0.144.2+).
|
||||
|
||||
Type strings are: "JSON", "String", "Bool", "Float64", "Int64"
|
||||
|
||||
SuperTyping rules (matching Go inferArrayMask):
|
||||
- JSON alone → Array(JSON)
|
||||
- JSON + any primitive → Array(Dynamic)
|
||||
- String alone → Array(Nullable(String)); String + other → Array(Dynamic)
|
||||
- Float64 wins over Int64 and Bool
|
||||
- Int64 wins over Bool
|
||||
- Bool alone → Array(Nullable(Bool))
|
||||
"""
|
||||
if len(types) == 0:
|
||||
return None
|
||||
|
||||
unique = set(types)
|
||||
|
||||
has_json = "JSON" in unique
|
||||
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
|
||||
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
|
||||
|
||||
if has_json:
|
||||
if not has_primitive:
|
||||
return "Array(JSON)"
|
||||
return "Array(Dynamic)"
|
||||
|
||||
# ---- Primitive Type Resolution (Float > Int > Bool) ----
|
||||
if "String" in unique:
|
||||
if len(unique) > 1:
|
||||
return "Array(Dynamic)"
|
||||
return "Array(Nullable(String))"
|
||||
|
||||
if "Float64" in unique:
|
||||
return "Array(Nullable(Float64))"
|
||||
if "Int64" in unique:
|
||||
return "Array(Nullable(Int64))"
|
||||
if "Bool" in unique:
|
||||
return "Array(Nullable(Bool))"
|
||||
|
||||
return "Array(Dynamic)"
|
||||
|
||||
|
||||
def _infer_array_type(elements: List[Any]) -> Optional[str]:
|
||||
"""
|
||||
Infer array type from raw Python list elements.
|
||||
Classifies each element then delegates to _infer_array_type_from_type_strings.
|
||||
"""
|
||||
if len(elements) == 0:
|
||||
return None
|
||||
|
||||
types = []
|
||||
for elem in elements:
|
||||
if elem is None:
|
||||
continue
|
||||
if isinstance(elem, dict):
|
||||
types.append("JSON")
|
||||
elif isinstance(elem, str):
|
||||
types.append("String")
|
||||
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
|
||||
types.append("Bool")
|
||||
elif isinstance(elem, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(elem, int):
|
||||
types.append("Int64")
|
||||
|
||||
return _infer_array_type_from_type_strings(types)
|
||||
|
||||
|
||||
def _python_type_to_clickhouse_type(value: Any) -> str:
|
||||
"""
|
||||
Convert Python type to ClickHouse JSON type string.
|
||||
Maps Python types to ClickHouse JSON data types.
|
||||
"""
|
||||
if value is None:
|
||||
return "String" # Default for null values
|
||||
|
||||
if isinstance(value, bool):
|
||||
return "Bool"
|
||||
elif isinstance(value, int):
|
||||
return "Int64"
|
||||
elif isinstance(value, float):
|
||||
return "Float64"
|
||||
elif isinstance(value, str):
|
||||
return "String"
|
||||
elif isinstance(value, list):
|
||||
# Use the sophisticated array type inference
|
||||
array_type = _infer_array_type(value)
|
||||
return array_type if array_type else "Array(Dynamic)"
|
||||
elif isinstance(value, dict):
|
||||
return "JSON"
|
||||
else:
|
||||
return "String" # Default fallback
|
||||
|
||||
|
||||
def _extract_json_paths(
|
||||
obj: Any,
|
||||
current_path: str = "",
|
||||
path_types: Optional[Dict[str, Set[str]]] = None,
|
||||
level: int = 0,
|
||||
) -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Recursively extract all paths and their types from a JSON object.
|
||||
Matches jsontypeexporter's analyzePValue logic.
|
||||
|
||||
Args:
|
||||
obj: The JSON object to traverse
|
||||
current_path: Current path being built (e.g., "user.name")
|
||||
path_types: Dictionary mapping paths to sets of types found
|
||||
level: Current nesting level (for depth limiting)
|
||||
|
||||
Returns:
|
||||
Dictionary mapping paths to sets of type strings
|
||||
"""
|
||||
if path_types is None:
|
||||
path_types = {}
|
||||
|
||||
if obj is None:
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add("String") # Null defaults to String
|
||||
return path_types
|
||||
|
||||
if isinstance(obj, dict):
|
||||
# For objects, add the object itself and recurse into keys
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add("JSON")
|
||||
|
||||
for key, value in obj.items():
|
||||
# Build the path for this key
|
||||
if current_path:
|
||||
new_path = f"{current_path}.{key}"
|
||||
else:
|
||||
new_path = key
|
||||
|
||||
# Recurse into the value
|
||||
_extract_json_paths(value, new_path, path_types, level + 1)
|
||||
|
||||
elif isinstance(obj, list):
|
||||
# Skip empty arrays
|
||||
if len(obj) == 0:
|
||||
return path_types
|
||||
|
||||
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
|
||||
types = []
|
||||
|
||||
for item in obj:
|
||||
if isinstance(item, dict):
|
||||
# When traversing into array element objects, use ArraySuffix ([])
|
||||
# This matches: prefix+ArraySuffix in the Go code
|
||||
# Example: if current_path is "education", we use "education[]" to traverse into objects
|
||||
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
|
||||
for key, value in item.items():
|
||||
if array_prefix:
|
||||
# Use array separator: education[].name
|
||||
array_path = f"{array_prefix}.{key}"
|
||||
else:
|
||||
array_path = key
|
||||
# Recurse without increasing level (matching Go behavior)
|
||||
_extract_json_paths(value, array_path, path_types, level)
|
||||
types.append("JSON")
|
||||
elif isinstance(item, list):
|
||||
# Arrays inside arrays are not supported - skip the whole path
|
||||
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
|
||||
return path_types
|
||||
elif isinstance(item, str):
|
||||
types.append("String")
|
||||
elif isinstance(item, bool):
|
||||
types.append("Bool")
|
||||
elif isinstance(item, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(item, int):
|
||||
types.append("Int64")
|
||||
|
||||
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
|
||||
if len(types) > 0:
|
||||
array_type = _infer_array_type_from_type_strings(types)
|
||||
if array_type and current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add(array_type)
|
||||
|
||||
else:
|
||||
# Primitive value (string, number, bool)
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
obj_type = _python_type_to_clickhouse_type(obj)
|
||||
path_types[current_path].add(obj_type)
|
||||
|
||||
return path_types
|
||||
|
||||
|
||||
def _parse_json_bodies_and_extract_paths(
|
||||
json_bodies: List[str],
|
||||
timestamp: Optional[datetime.datetime] = None,
|
||||
) -> List[JSONPathType]:
|
||||
"""
|
||||
Parse JSON bodies and extract all paths with their types.
|
||||
This mimics the behavior of jsontypeexporter.
|
||||
|
||||
Args:
|
||||
json_bodies: List of JSON body strings to parse
|
||||
timestamp: Timestamp to use for last_seen (defaults to now)
|
||||
|
||||
Returns:
|
||||
List of JSONPathType objects with all discovered paths and types
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
|
||||
# Aggregate all paths and their types across all JSON bodies
|
||||
all_path_types: Dict[str, Set[str]] = {}
|
||||
|
||||
for json_body in json_bodies:
|
||||
try:
|
||||
parsed = json.loads(json_body)
|
||||
_extract_json_paths(parsed, "", all_path_types, level=0)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Skip invalid JSON
|
||||
continue
|
||||
|
||||
# Convert to list of JSONPathType objects
|
||||
# Each path can have multiple types, so we create one JSONPathType per type
|
||||
path_type_objects: List[JSONPathType] = []
|
||||
for path, types_set in all_path_types.items():
|
||||
for type_str in types_set:
|
||||
path_type_objects.append(
|
||||
JSONPathType(path=path, type=type_str, last_seen=timestamp)
|
||||
)
|
||||
|
||||
return path_type_objects
|
||||
|
||||
|
||||
@pytest.fixture(name="export_json_types", scope="function")
|
||||
def export_json_types(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest, # To access migrator fixture
|
||||
) -> Generator[
|
||||
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
|
||||
]:
|
||||
"""
|
||||
Fixture for exporting JSON type metadata to the path_types table.
|
||||
This is a simpler version of jsontypeexporter for test fixtures.
|
||||
|
||||
The function can accept:
|
||||
1. List of JSONPathType objects (manual specification)
|
||||
2. List of JSON body strings (auto-extract paths)
|
||||
3. List of Logs objects (extract from body_json field)
|
||||
|
||||
Usage examples:
|
||||
# Manual specification
|
||||
export_json_types([
|
||||
JSONPathType(path="user.name", type="String"),
|
||||
JSONPathType(path="user.age", type="Int64"),
|
||||
])
|
||||
|
||||
# Auto-extract from JSON strings
|
||||
export_json_types([
|
||||
'{"user": {"name": "alice", "age": 25}}',
|
||||
'{"user": {"name": "bob", "age": 30}}',
|
||||
])
|
||||
|
||||
# Auto-extract from Logs objects
|
||||
export_json_types(logs_list)
|
||||
"""
|
||||
# Ensure migrator has run to create the table
|
||||
try:
|
||||
request.getfixturevalue("migrator")
|
||||
except Exception:
|
||||
# If migrator fixture is not available, that's okay - table might already exist
|
||||
pass
|
||||
|
||||
def _export_json_types(
|
||||
data: Union[
|
||||
List[JSONPathType], List[str], List[Any]
|
||||
], # List[Logs] but avoiding circular import
|
||||
) -> None:
|
||||
"""
|
||||
Export JSON type metadata to signoz_metadata.distributed_json_path_types table.
|
||||
This table stores path and type information for body JSON fields.
|
||||
"""
|
||||
path_types: List[JSONPathType] = []
|
||||
|
||||
if len(data) == 0:
|
||||
return
|
||||
|
||||
# Determine input type and convert to JSONPathType list
|
||||
first_item = data[0]
|
||||
|
||||
if isinstance(first_item, JSONPathType):
|
||||
# Already JSONPathType objects
|
||||
path_types = data # type: ignore
|
||||
elif isinstance(first_item, str):
|
||||
# List of JSON strings - parse and extract paths
|
||||
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
|
||||
else:
|
||||
# Assume it's a list of Logs objects - extract body_v2
|
||||
json_bodies: List[str] = []
|
||||
for log in data: # type: ignore
|
||||
# Try to get body_v2 attribute
|
||||
if hasattr(log, "body_v2") and log.body_v2:
|
||||
json_bodies.append(log.body_v2)
|
||||
elif hasattr(log, "body") and log.body:
|
||||
# Fallback to body if body_v2 not available
|
||||
try:
|
||||
# Try to parse as JSON
|
||||
json.loads(log.body)
|
||||
json_bodies.append(log.body)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
if json_bodies:
|
||||
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
|
||||
|
||||
if len(path_types) == 0:
|
||||
return
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_json_path_types",
|
||||
data=[path_type.np_arr() for path_type in path_types],
|
||||
column_names=[
|
||||
"path",
|
||||
"type",
|
||||
"last_seen",
|
||||
],
|
||||
)
|
||||
|
||||
yield _export_json_types
|
||||
|
||||
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.json_path_types ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="export_promoted_paths", scope="function")
|
||||
def export_promoted_paths(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest, # To access migrator fixture
|
||||
) -> Generator[Callable[[List[str]], None], Any, None]:
|
||||
"""
|
||||
Fixture for exporting promoted JSON paths to the promoted paths table.
|
||||
"""
|
||||
# Ensure migrator has run to create the table
|
||||
try:
|
||||
request.getfixturevalue("migrator")
|
||||
except Exception:
|
||||
# If migrator fixture is not available, that's okay - table might already exist
|
||||
pass
|
||||
|
||||
def _export_promoted_paths(paths: List[str]) -> None:
|
||||
if len(paths) == 0:
|
||||
return
|
||||
|
||||
now_ms = int(datetime.datetime.now().timestamp() * 1000)
|
||||
rows = [(path, now_ms) for path in paths]
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_json_promoted_paths",
|
||||
data=rows,
|
||||
column_names=[
|
||||
"path",
|
||||
"created_at",
|
||||
],
|
||||
)
|
||||
|
||||
yield _export_promoted_paths
|
||||
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.json_promoted_paths ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
||||
)
|
||||
@@ -122,6 +122,8 @@ class Logs(ABC):
|
||||
resources: dict[str, Any] = {},
|
||||
attributes: dict[str, Any] = {},
|
||||
body: str = "default body",
|
||||
body_v2: Optional[str] = None,
|
||||
body_promoted: Optional[str] = None,
|
||||
severity_text: str = "INFO",
|
||||
trace_id: str = "",
|
||||
span_id: str = "",
|
||||
@@ -167,6 +169,33 @@ class Logs(ABC):
|
||||
# Set body
|
||||
self.body = body
|
||||
|
||||
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
|
||||
# ClickHouse accepts String input for JSON column
|
||||
if body_v2 is not None:
|
||||
self.body_v2 = body_v2
|
||||
else:
|
||||
# Try to parse body as JSON; if successful use it directly,
|
||||
# otherwise wrap as {"message": body} matching the normalize operator behavior.
|
||||
try:
|
||||
json.loads(body)
|
||||
self.body_v2 = body
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
self.body_v2 = json.dumps({"message": body})
|
||||
|
||||
# Set body_promoted - must be valid JSON
|
||||
# Tests will explicitly pass promoted column's content, but we validate it
|
||||
if body_promoted is not None:
|
||||
# Validate that it's valid JSON
|
||||
try:
|
||||
json.loads(body_promoted)
|
||||
self.body_promoted = body_promoted
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# If invalid, default to empty JSON object
|
||||
self.body_promoted = "{}"
|
||||
else:
|
||||
# Default to empty JSON object (valid JSON)
|
||||
self.body_promoted = "{}"
|
||||
|
||||
# Process resources and attributes
|
||||
self.resources_string = {k: str(v) for k, v in resources.items()}
|
||||
self.resource_json = (
|
||||
@@ -326,6 +355,8 @@ class Logs(ABC):
|
||||
self.severity_text,
|
||||
self.severity_number,
|
||||
self.body,
|
||||
self.body_v2,
|
||||
self.body_promoted,
|
||||
self.attributes_string,
|
||||
self.attributes_number,
|
||||
self.attributes_bool,
|
||||
@@ -454,31 +485,53 @@ def insert_logs(
|
||||
data=[resource_key.np_arr() for resource_key in resource_keys],
|
||||
)
|
||||
|
||||
# All columns in insertion order (must match Logs.np_arr() order)
|
||||
all_column_names = [
|
||||
"ts_bucket_start",
|
||||
"resource_fingerprint",
|
||||
"timestamp",
|
||||
"observed_timestamp",
|
||||
"id",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_flags",
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"body_v2",
|
||||
"body_promoted",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
"resources_string",
|
||||
"scope_name",
|
||||
"scope_version",
|
||||
"scope_string",
|
||||
"resource",
|
||||
]
|
||||
|
||||
# Check if body_v2 column exists (only present when ENABLE_LOGS_MIGRATIONS_V2 migration has run)
|
||||
result = clickhouse.conn.query(
|
||||
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
|
||||
)
|
||||
has_json_body = result.result_rows[0][0] > 0
|
||||
|
||||
if has_json_body:
|
||||
column_names = all_column_names
|
||||
data = [log.np_arr() for log in logs]
|
||||
else:
|
||||
json_body_cols = {"body_v2", "body_promoted"}
|
||||
keep_indices = [
|
||||
i for i, c in enumerate(all_column_names) if c not in json_body_cols
|
||||
]
|
||||
column_names = [all_column_names[i] for i in keep_indices]
|
||||
data = [log.np_arr()[keep_indices] for log in logs]
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_logs",
|
||||
table="distributed_logs_v2",
|
||||
data=[log.np_arr() for log in logs],
|
||||
column_names=[
|
||||
"ts_bucket_start",
|
||||
"resource_fingerprint",
|
||||
"timestamp",
|
||||
"observed_timestamp",
|
||||
"id",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_flags",
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
"resources_string",
|
||||
"scope_name",
|
||||
"scope_version",
|
||||
"scope_string",
|
||||
"resource",
|
||||
],
|
||||
data=data,
|
||||
column_names=column_names,
|
||||
)
|
||||
|
||||
yield _insert_logs
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from typing import Optional
|
||||
|
||||
import docker
|
||||
import pytest
|
||||
from testcontainers.core.container import Network
|
||||
@@ -8,27 +10,32 @@ from fixtures.logger import setup_logger
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator(
|
||||
def create_migrator(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
cache_key: str = "migrator",
|
||||
env_overrides: Optional[dict] = None,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped fixture for running schema migrations.
|
||||
Factory function for running schema migrations.
|
||||
Accepts optional env_overrides to customize the migrator environment.
|
||||
"""
|
||||
|
||||
def create() -> None:
|
||||
version = request.config.getoption("--schema-migrator-version")
|
||||
client = docker.from_env()
|
||||
|
||||
environment = dict(env_overrides) if env_overrides else {}
|
||||
|
||||
container = client.containers.run(
|
||||
image=f"signoz/signoz-schema-migrator:{version}",
|
||||
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
environment=environment,
|
||||
)
|
||||
|
||||
result = container.wait()
|
||||
@@ -47,6 +54,7 @@ def migrator(
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
environment=environment,
|
||||
)
|
||||
|
||||
result = container.wait()
|
||||
@@ -59,7 +67,7 @@ def migrator(
|
||||
|
||||
container.remove()
|
||||
|
||||
return types.Operation(name="migrator")
|
||||
return types.Operation(name=cache_key)
|
||||
|
||||
def delete(_: types.Operation) -> None:
|
||||
pass
|
||||
@@ -70,9 +78,27 @@ def migrator(
|
||||
return dev.wrap(
|
||||
request,
|
||||
pytestconfig,
|
||||
"migrator",
|
||||
cache_key,
|
||||
lambda: types.Operation(name=""),
|
||||
create,
|
||||
delete,
|
||||
restore,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped fixture for running schema migrations.
|
||||
"""
|
||||
return create_migrator(
|
||||
network=network,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
)
|
||||
|
||||
1181
tests/integration/src/querier_json_body/01_logs_json_body_new_qb.py
Normal file
1181
tests/integration/src/querier_json_body/01_logs_json_body_new_qb.py
Normal file
File diff suppressed because it is too large
Load Diff
0
tests/integration/src/querier_json_body/__init__.py
Normal file
0
tests/integration/src/querier_json_body/__init__.py
Normal file
70
tests/integration/src/querier_json_body/conftest.py
Normal file
70
tests/integration/src/querier_json_body/conftest.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import pytest
|
||||
from testcontainers.core.container import Network
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.migrator import create_migrator
|
||||
from fixtures.signoz import create_signoz
|
||||
|
||||
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(
|
||||
config: pytest.Config, items: list[pytest.Item]
|
||||
) -> None:
|
||||
version = config.getoption("--clickhouse-version")
|
||||
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
|
||||
skip = pytest.mark.skip(
|
||||
reason=f"JSON body QB tests require ClickHouse > {version}"
|
||||
)
|
||||
for item in items:
|
||||
item.add_marker(skip)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator_json(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
|
||||
"""
|
||||
return create_migrator(
|
||||
network=network,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
cache_key="migrator-json-body",
|
||||
env_overrides={
|
||||
"ENABLE_LOGS_MIGRATIONS_V2": "1",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="signoz", scope="package")
|
||||
def signoz_json_body(
|
||||
network: Network,
|
||||
zeus: types.TestContainerDocker,
|
||||
gateway: types.TestContainerDocker,
|
||||
sqlstore: types.TestContainerSQL,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.SigNoz:
|
||||
"""
|
||||
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
|
||||
"""
|
||||
return create_signoz(
|
||||
network=network,
|
||||
zeus=zeus,
|
||||
gateway=gateway,
|
||||
sqlstore=sqlstore,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
cache_key="signoz-json-body",
|
||||
env_overrides={
|
||||
"BODY_JSON_QUERY_ENABLED": "true",
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user