mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-24 13:20:27 +00:00
Compare commits
69 Commits
platform-p
...
refactor/c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee5d182539 | ||
|
|
531979543c | ||
|
|
4b09f057b9 | ||
|
|
dde7c79b4d | ||
|
|
0bc12f02bc | ||
|
|
c95523c747 | ||
|
|
e5f00421fe | ||
|
|
539252e10c | ||
|
|
d65f426254 | ||
|
|
6e52f2c8f0 | ||
|
|
d9f8a4ae5a | ||
|
|
eefe3edffd | ||
|
|
2051861a03 | ||
|
|
4b01a40fb9 | ||
|
|
2d8a00bf18 | ||
|
|
f1b26b310f | ||
|
|
2c438b6c32 | ||
|
|
1814c2d13c | ||
|
|
e6cd771f11 | ||
|
|
6b94f87ca0 | ||
|
|
bf315253ae | ||
|
|
668ff7bc39 | ||
|
|
07f2aa52fd | ||
|
|
3416b3ad55 | ||
|
|
d6caa4f2c7 | ||
|
|
f86371566d | ||
|
|
9115803084 | ||
|
|
0c14d8f966 | ||
|
|
7afb461af8 | ||
|
|
a21fbb4ee0 | ||
|
|
0369842f3d | ||
|
|
59cd96562a | ||
|
|
cc4475cab7 | ||
|
|
ac8c648420 | ||
|
|
bede6be4b8 | ||
|
|
dd3d60e6df | ||
|
|
538ab686d2 | ||
|
|
936a325cb9 | ||
|
|
c6cdcd0143 | ||
|
|
cd9211d718 | ||
|
|
0601c28782 | ||
|
|
580610dbfa | ||
|
|
2d2aa02a81 | ||
|
|
dd9723ad13 | ||
|
|
3651469416 | ||
|
|
febce75734 | ||
|
|
e1616f3487 | ||
|
|
4b94287ac7 | ||
|
|
1575c7c54c | ||
|
|
8def3f835b | ||
|
|
11ed15f4c5 | ||
|
|
f47877cca9 | ||
|
|
bb2b9215ba | ||
|
|
3111904223 | ||
|
|
003e2c30d8 | ||
|
|
00fe516d10 | ||
|
|
0305f4f7db | ||
|
|
c60019a6dc | ||
|
|
acde2a37fa | ||
|
|
945241a52a | ||
|
|
e967f80c86 | ||
|
|
a09dc325de | ||
|
|
379b4f7fc4 | ||
|
|
5e536ae077 | ||
|
|
234585e642 | ||
|
|
2cc14f1ad4 | ||
|
|
dc4ed4d239 | ||
|
|
7281c36873 | ||
|
|
40288776e8 |
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@@ -17,5 +17,7 @@
|
||||
},
|
||||
"[html]": {
|
||||
"editor.defaultFormatter": "vscode.html-language-features"
|
||||
}
|
||||
},
|
||||
"python-envs.defaultEnvManager": "ms-python.python:system",
|
||||
"python-envs.pythonProjects": []
|
||||
}
|
||||
|
||||
1010
docs/api/openapi.yml
1010
docs/api/openapi.yml
File diff suppressed because it is too large
Load Diff
@@ -123,6 +123,7 @@ if err := router.Handle("/api/v1/things", handler.New(
|
||||
Description: "This endpoint creates a thing",
|
||||
Request: new(types.PostableThing),
|
||||
RequestContentType: "application/json",
|
||||
RequestQuery: new(types.QueryableThing),
|
||||
Response: new(types.GettableThing),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusCreated,
|
||||
@@ -155,6 +156,8 @@ The `handler.New` function ties the HTTP handler to OpenAPI metadata via `OpenAP
|
||||
- **Request / RequestContentType**:
|
||||
- `Request` is a Go type that describes the request body or form.
|
||||
- `RequestContentType` is usually `"application/json"` or `"application/x-www-form-urlencoded"` (for callbacks like SAML).
|
||||
- **RequestQuery**:
|
||||
- `RequestQuery` is a Go type that descirbes query url params.
|
||||
- **RequestExamples**: An array of `handler.OpenAPIExample` that provide concrete request payloads in the generated spec. See [Adding request examples](#adding-request-examples) below.
|
||||
- **Response / ResponseContentType**:
|
||||
- `Response` is the Go type for the successful response payload.
|
||||
|
||||
1041
frontend/src/api/generated/services/cloudintegration/index.ts
Normal file
1041
frontend/src/api/generated/services/cloudintegration/index.ts
Normal file
File diff suppressed because it is too large
Load Diff
@@ -20,11 +20,113 @@ import { useMutation, useQuery } from 'react-query';
|
||||
import type { BodyType, ErrorType } from '../../../generatedAPIInstance';
|
||||
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
|
||||
import type {
|
||||
HandleExportRawDataPOSTParams,
|
||||
ListPromotedAndIndexedPaths200,
|
||||
PromotetypesPromotePathDTO,
|
||||
Querybuildertypesv5QueryRangeRequestDTO,
|
||||
RenderErrorResponseDTO,
|
||||
} from '../sigNoz.schemas';
|
||||
|
||||
/**
|
||||
* This endpoints allows complex query exporting raw data for traces and logs
|
||||
* @summary Export raw data
|
||||
*/
|
||||
export const handleExportRawDataPOST = (
|
||||
querybuildertypesv5QueryRangeRequestDTO: BodyType<Querybuildertypesv5QueryRangeRequestDTO>,
|
||||
params?: HandleExportRawDataPOSTParams,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<string>({
|
||||
url: `/api/v1/export_raw_data`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: querybuildertypesv5QueryRangeRequestDTO,
|
||||
params,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getHandleExportRawDataPOSTMutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>,
|
||||
TError,
|
||||
{
|
||||
data: BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
params?: HandleExportRawDataPOSTParams;
|
||||
},
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>,
|
||||
TError,
|
||||
{
|
||||
data: BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
params?: HandleExportRawDataPOSTParams;
|
||||
},
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['handleExportRawDataPOST'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>,
|
||||
{
|
||||
data: BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
params?: HandleExportRawDataPOSTParams;
|
||||
}
|
||||
> = (props) => {
|
||||
const { data, params } = props ?? {};
|
||||
|
||||
return handleExportRawDataPOST(data, params);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type HandleExportRawDataPOSTMutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>
|
||||
>;
|
||||
export type HandleExportRawDataPOSTMutationBody = BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
export type HandleExportRawDataPOSTMutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Export raw data
|
||||
*/
|
||||
export const useHandleExportRawDataPOST = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>,
|
||||
TError,
|
||||
{
|
||||
data: BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
params?: HandleExportRawDataPOSTParams;
|
||||
},
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof handleExportRawDataPOST>>,
|
||||
TError,
|
||||
{
|
||||
data: BodyType<Querybuildertypesv5QueryRangeRequestDTO>;
|
||||
params?: HandleExportRawDataPOSTParams;
|
||||
},
|
||||
TContext
|
||||
> => {
|
||||
const mutationOptions = getHandleExportRawDataPOSTMutationOptions(options);
|
||||
|
||||
return useMutation(mutationOptions);
|
||||
};
|
||||
/**
|
||||
* This endpoints promotes and indexes paths
|
||||
* @summary Promote and index paths
|
||||
|
||||
@@ -437,6 +437,436 @@ export interface AuthtypesUpdateableAuthDomainDTO {
|
||||
config?: AuthtypesAuthDomainConfigDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSAccountConfigDTO {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
regions: string[];
|
||||
}
|
||||
|
||||
export type CloudintegrationtypesAWSCollectionStrategyDTOS3Buckets = {
|
||||
[key: string]: string[];
|
||||
};
|
||||
|
||||
export interface CloudintegrationtypesAWSCollectionStrategyDTO {
|
||||
aws_logs?: CloudintegrationtypesAWSLogsStrategyDTO;
|
||||
aws_metrics?: CloudintegrationtypesAWSMetricsStrategyDTO;
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
s3_buckets?: CloudintegrationtypesAWSCollectionStrategyDTOS3Buckets;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSConnectionArtifactDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
connectionURL: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSConnectionArtifactRequestDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
deploymentRegion: string;
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
regions: string[];
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSIntegrationConfigDTO {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
enabledRegions: string[];
|
||||
telemetry: CloudintegrationtypesAWSCollectionStrategyDTO;
|
||||
}
|
||||
|
||||
export type CloudintegrationtypesAWSLogsStrategyDTOCloudwatchLogsSubscriptionsItem = {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
filter_pattern?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
log_group_name_prefix?: string;
|
||||
};
|
||||
|
||||
export interface CloudintegrationtypesAWSLogsStrategyDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
cloudwatch_logs_subscriptions?:
|
||||
| CloudintegrationtypesAWSLogsStrategyDTOCloudwatchLogsSubscriptionsItem[]
|
||||
| null;
|
||||
}
|
||||
|
||||
export type CloudintegrationtypesAWSMetricsStrategyDTOCloudwatchMetricStreamFiltersItem = {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
MetricNames?: string[];
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
Namespace?: string;
|
||||
};
|
||||
|
||||
export interface CloudintegrationtypesAWSMetricsStrategyDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
cloudwatch_metric_stream_filters?:
|
||||
| CloudintegrationtypesAWSMetricsStrategyDTOCloudwatchMetricStreamFiltersItem[]
|
||||
| null;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSServiceConfigDTO {
|
||||
logs?: CloudintegrationtypesAWSServiceLogsConfigDTO;
|
||||
metrics?: CloudintegrationtypesAWSServiceMetricsConfigDTO;
|
||||
}
|
||||
|
||||
export type CloudintegrationtypesAWSServiceLogsConfigDTOS3Buckets = {
|
||||
[key: string]: string[];
|
||||
};
|
||||
|
||||
export interface CloudintegrationtypesAWSServiceLogsConfigDTO {
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
enabled?: boolean;
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
s3_buckets?: CloudintegrationtypesAWSServiceLogsConfigDTOS3Buckets;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAWSServiceMetricsConfigDTO {
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
enabled?: boolean;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAccountDTO {
|
||||
agentReport: CloudintegrationtypesAgentReportDTO;
|
||||
config: CloudintegrationtypesAccountConfigDTO;
|
||||
/**
|
||||
* @type string
|
||||
* @format date-time
|
||||
*/
|
||||
createdAt?: Date;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
id: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
orgId: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
provider: string;
|
||||
/**
|
||||
* @type string
|
||||
* @nullable true
|
||||
*/
|
||||
providerAccountId: string | null;
|
||||
/**
|
||||
* @type string
|
||||
* @format date-time
|
||||
* @nullable true
|
||||
*/
|
||||
removedAt: Date | null;
|
||||
/**
|
||||
* @type string
|
||||
* @format date-time
|
||||
*/
|
||||
updatedAt?: Date;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesAccountConfigDTO {
|
||||
aws: CloudintegrationtypesAWSAccountConfigDTO;
|
||||
}
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type CloudintegrationtypesAgentReportDTOData = {
|
||||
[key: string]: unknown;
|
||||
} | null;
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type CloudintegrationtypesAgentReportDTO = {
|
||||
/**
|
||||
* @type object
|
||||
* @nullable true
|
||||
*/
|
||||
data: CloudintegrationtypesAgentReportDTOData;
|
||||
/**
|
||||
* @type integer
|
||||
* @format int64
|
||||
*/
|
||||
timestampMillis: number;
|
||||
} | null;
|
||||
|
||||
export interface CloudintegrationtypesAssetsDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
dashboards?: CloudintegrationtypesDashboardDTO[] | null;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesCollectedLogAttributeDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
name?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
path?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
type?: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesCollectedMetricDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
description?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
name?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
type?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
unit?: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesCollectionStrategyDTO {
|
||||
aws: CloudintegrationtypesAWSCollectionStrategyDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesConnectionArtifactDTO {
|
||||
aws: CloudintegrationtypesAWSConnectionArtifactDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesConnectionArtifactRequestDTO {
|
||||
aws: CloudintegrationtypesAWSConnectionArtifactRequestDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesDashboardDTO {
|
||||
definition?: DashboardtypesStorableDashboardDataDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
description?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
id?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
title?: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesDataCollectedDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
logs?: CloudintegrationtypesCollectedLogAttributeDTO[] | null;
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
metrics?: CloudintegrationtypesCollectedMetricDTO[] | null;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesGettableAccountWithArtifactDTO {
|
||||
connectionArtifact: CloudintegrationtypesConnectionArtifactDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
id: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesGettableAccountsDTO {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
accounts: CloudintegrationtypesAccountDTO[];
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesGettableAgentCheckInResponseDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
account_id: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
cloud_account_id: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
cloudIntegrationId: string;
|
||||
integration_config: CloudintegrationtypesIntegrationConfigDTO;
|
||||
integrationConfig: CloudintegrationtypesProviderIntegrationConfigDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
providerAccountId: string;
|
||||
/**
|
||||
* @type string
|
||||
* @format date-time
|
||||
* @nullable true
|
||||
*/
|
||||
removed_at: Date | null;
|
||||
/**
|
||||
* @type string
|
||||
* @format date-time
|
||||
* @nullable true
|
||||
*/
|
||||
removedAt: Date | null;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesGettableServicesMetadataDTO {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
services: CloudintegrationtypesServiceMetadataDTO[];
|
||||
}
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type CloudintegrationtypesIntegrationConfigDTO = {
|
||||
/**
|
||||
* @type array
|
||||
*/
|
||||
enabled_regions: string[];
|
||||
telemetry: CloudintegrationtypesAWSCollectionStrategyDTO;
|
||||
} | null;
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type CloudintegrationtypesPostableAgentCheckInRequestDTOData = {
|
||||
[key: string]: unknown;
|
||||
} | null;
|
||||
|
||||
export interface CloudintegrationtypesPostableAgentCheckInRequestDTO {
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
account_id?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
cloud_account_id?: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
cloudIntegrationId?: string;
|
||||
/**
|
||||
* @type object
|
||||
* @nullable true
|
||||
*/
|
||||
data: CloudintegrationtypesPostableAgentCheckInRequestDTOData;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
providerAccountId?: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesProviderIntegrationConfigDTO {
|
||||
aws: CloudintegrationtypesAWSIntegrationConfigDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesServiceDTO {
|
||||
assets: CloudintegrationtypesAssetsDTO;
|
||||
dataCollected: CloudintegrationtypesDataCollectedDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
icon: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
id: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
overview: string;
|
||||
serviceConfig?: CloudintegrationtypesServiceConfigDTO;
|
||||
supported_signals: CloudintegrationtypesSupportedSignalsDTO;
|
||||
telemetryCollectionStrategy: CloudintegrationtypesCollectionStrategyDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
title: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesServiceConfigDTO {
|
||||
aws: CloudintegrationtypesAWSServiceConfigDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesServiceMetadataDTO {
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
enabled: boolean;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
icon: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
id: string;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
title: string;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesSupportedSignalsDTO {
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
logs?: boolean;
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
metrics?: boolean;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesUpdatableAccountDTO {
|
||||
config: CloudintegrationtypesAccountConfigDTO;
|
||||
}
|
||||
|
||||
export interface CloudintegrationtypesUpdatableServiceDTO {
|
||||
config: CloudintegrationtypesServiceConfigDTO;
|
||||
}
|
||||
|
||||
export interface DashboardtypesDashboardDTO {
|
||||
/**
|
||||
* @type string
|
||||
@@ -2858,6 +3288,97 @@ export type AuthzResources200 = {
|
||||
export type ChangePasswordPathParameters = {
|
||||
id: string;
|
||||
};
|
||||
export type AgentCheckInDeprecatedPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type AgentCheckInDeprecated200 = {
|
||||
data: CloudintegrationtypesGettableAgentCheckInResponseDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type ListAccountsPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type ListAccounts200 = {
|
||||
data: CloudintegrationtypesGettableAccountsDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type CreateAccountPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type CreateAccount200 = {
|
||||
data: CloudintegrationtypesGettableAccountWithArtifactDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type DisconnectAccountPathParameters = {
|
||||
cloudProvider: string;
|
||||
id: string;
|
||||
};
|
||||
export type GetAccountPathParameters = {
|
||||
cloudProvider: string;
|
||||
id: string;
|
||||
};
|
||||
export type GetAccount200 = {
|
||||
data: CloudintegrationtypesAccountDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type UpdateAccountPathParameters = {
|
||||
cloudProvider: string;
|
||||
id: string;
|
||||
};
|
||||
export type AgentCheckInPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type AgentCheckIn200 = {
|
||||
data: CloudintegrationtypesGettableAgentCheckInResponseDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type ListServicesMetadataPathParameters = {
|
||||
cloudProvider: string;
|
||||
};
|
||||
export type ListServicesMetadata200 = {
|
||||
data: CloudintegrationtypesGettableServicesMetadataDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type GetServicePathParameters = {
|
||||
cloudProvider: string;
|
||||
serviceId: string;
|
||||
};
|
||||
export type GetService200 = {
|
||||
data: CloudintegrationtypesServiceDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type UpdateServicePathParameters = {
|
||||
cloudProvider: string;
|
||||
serviceId: string;
|
||||
};
|
||||
export type CreateSessionByGoogleCallback303 = {
|
||||
data: AuthtypesGettableTokenDTO;
|
||||
/**
|
||||
@@ -2959,6 +3480,19 @@ export type DeleteAuthDomainPathParameters = {
|
||||
export type UpdateAuthDomainPathParameters = {
|
||||
id: string;
|
||||
};
|
||||
export type HandleExportRawDataPOSTParams = {
|
||||
/**
|
||||
* @enum csv,jsonl
|
||||
* @type string
|
||||
* @description The output format for the export.
|
||||
*/
|
||||
format?: HandleExportRawDataPOSTFormat;
|
||||
};
|
||||
|
||||
export enum HandleExportRawDataPOSTFormat {
|
||||
csv = 'csv',
|
||||
jsonl = 'jsonl',
|
||||
}
|
||||
export type GetFieldsKeysParams = {
|
||||
/**
|
||||
* @description undefined
|
||||
|
||||
@@ -62,9 +62,6 @@ export const getVolumeQueryPayload = (
|
||||
const k8sPVCNameKey = dotMetricsEnabled
|
||||
? 'k8s.persistentvolumeclaim.name'
|
||||
: 'k8s_persistentvolumeclaim_name';
|
||||
const legendTemplate = dotMetricsEnabled
|
||||
? '{{k8s.namespace.name}}-{{k8s.pod.name}}'
|
||||
: '{{k8s_namespace_name}}-{{k8s_pod_name}}';
|
||||
|
||||
return [
|
||||
{
|
||||
@@ -136,7 +133,7 @@ export const getVolumeQueryPayload = (
|
||||
functions: [],
|
||||
groupBy: [],
|
||||
having: [],
|
||||
legend: legendTemplate,
|
||||
legend: 'Available',
|
||||
limit: null,
|
||||
orderBy: [],
|
||||
queryName: 'A',
|
||||
@@ -228,7 +225,7 @@ export const getVolumeQueryPayload = (
|
||||
functions: [],
|
||||
groupBy: [],
|
||||
having: [],
|
||||
legend: legendTemplate,
|
||||
legend: 'Capacity',
|
||||
limit: null,
|
||||
orderBy: [],
|
||||
queryName: 'A',
|
||||
@@ -319,7 +316,7 @@ export const getVolumeQueryPayload = (
|
||||
},
|
||||
groupBy: [],
|
||||
having: [],
|
||||
legend: legendTemplate,
|
||||
legend: 'Inodes Used',
|
||||
limit: null,
|
||||
orderBy: [],
|
||||
queryName: 'A',
|
||||
@@ -411,7 +408,7 @@ export const getVolumeQueryPayload = (
|
||||
},
|
||||
groupBy: [],
|
||||
having: [],
|
||||
legend: legendTemplate,
|
||||
legend: 'Total Inodes',
|
||||
limit: null,
|
||||
orderBy: [],
|
||||
queryName: 'A',
|
||||
@@ -503,7 +500,7 @@ export const getVolumeQueryPayload = (
|
||||
},
|
||||
groupBy: [],
|
||||
having: [],
|
||||
legend: legendTemplate,
|
||||
legend: 'Inodes Free',
|
||||
limit: null,
|
||||
orderBy: [],
|
||||
queryName: 'A',
|
||||
|
||||
216
pkg/apiserver/signozapiserver/cloudintegration.go
Normal file
216
pkg/apiserver/signozapiserver/cloudintegration.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
citypes "github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addCloudIntegrationRoutes(router *mux.Router) error {
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.CreateAccount),
|
||||
handler.OpenAPIDef{
|
||||
ID: "CreateAccount",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Create account",
|
||||
Description: "This endpoint creates a new cloud integration account for the specified cloud provider",
|
||||
Request: new(citypes.PostableConnectionArtifact),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(citypes.GettableAccountWithArtifact),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.ListAccounts),
|
||||
handler.OpenAPIDef{
|
||||
ID: "ListAccounts",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "List accounts",
|
||||
Description: "This endpoint lists the accounts for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.GettableAccounts),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts/{id}", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.GetAccount),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetAccount",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Get account",
|
||||
Description: "This endpoint gets an account for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.GettableAccount),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts/{id}", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.UpdateAccount),
|
||||
handler.OpenAPIDef{
|
||||
ID: "UpdateAccount",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Update account",
|
||||
Description: "This endpoint updates an account for the specified cloud provider",
|
||||
Request: new(citypes.UpdatableAccount),
|
||||
RequestContentType: "application/json",
|
||||
Response: nil,
|
||||
ResponseContentType: "",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodPut).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts/{id}", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.DisconnectAccount),
|
||||
handler.OpenAPIDef{
|
||||
ID: "DisconnectAccount",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Disconnect account",
|
||||
Description: "This endpoint disconnects an account for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: nil,
|
||||
ResponseContentType: "",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodDelete).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/services", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.ListServicesMetadata),
|
||||
handler.OpenAPIDef{
|
||||
ID: "ListServicesMetadata",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "List services metadata",
|
||||
Description: "This endpoint lists the services metadata for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.GettableServicesMetadata),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/services/{service_id}", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.GetService),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetService",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Get service",
|
||||
Description: "This endpoint gets a service for the specified cloud provider",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: new(citypes.GettableService),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/services/{service_id}", handler.New(
|
||||
provider.authZ.AdminAccess(provider.cloudIntegrationHandler.UpdateService),
|
||||
handler.OpenAPIDef{
|
||||
ID: "UpdateService",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Update service",
|
||||
Description: "This endpoint updates a service for the specified cloud provider",
|
||||
Request: new(citypes.UpdatableService),
|
||||
RequestContentType: "application/json",
|
||||
Response: nil,
|
||||
ResponseContentType: "",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
|
||||
},
|
||||
)).Methods(http.MethodPut).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Agent check-in endpoint is kept same as older one to maintain backward compatibility with already deployed agents.
|
||||
// In the future, this endpoint will be deprecated and a new endpoint will be introduced for consistency with above endpoints.
|
||||
if err := router.Handle("/api/v1/cloud-integrations/{cloud_provider}/agent-check-in", handler.New(
|
||||
provider.authZ.ViewAccess(provider.cloudIntegrationHandler.AgentCheckIn),
|
||||
handler.OpenAPIDef{
|
||||
ID: "AgentCheckInDeprecated",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Agent check-in",
|
||||
Description: "[Deprecated] This endpoint is called by the deployed agent to check in",
|
||||
Request: new(citypes.PostableAgentCheckInRequest),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(citypes.GettableAgentCheckInResponse),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: true, // this endpoint will be deprecated in future
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer), // agent role is viewer
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/cloud_integrations/{cloud_provider}/accounts/check_in", handler.New(
|
||||
provider.authZ.ViewAccess(provider.cloudIntegrationHandler.AgentCheckIn),
|
||||
handler.OpenAPIDef{
|
||||
ID: "AgentCheckIn",
|
||||
Tags: []string{"cloudintegration"},
|
||||
Summary: "Agent check-in",
|
||||
Description: "This endpoint is called by the deployed agent to check in",
|
||||
Request: new(citypes.PostableAgentCheckInRequest),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(citypes.GettableAgentCheckInResponse),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer), // agent role is viewer
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -12,12 +12,14 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
"github.com/SigNoz/signoz/pkg/modules/promote"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
@@ -29,28 +31,30 @@ import (
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
config apiserver.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
router *mux.Router
|
||||
authZ *middleware.AuthZ
|
||||
orgHandler organization.Handler
|
||||
userHandler user.Handler
|
||||
sessionHandler session.Handler
|
||||
authDomainHandler authdomain.Handler
|
||||
preferenceHandler preference.Handler
|
||||
globalHandler global.Handler
|
||||
promoteHandler promote.Handler
|
||||
flaggerHandler flagger.Handler
|
||||
dashboardModule dashboard.Module
|
||||
dashboardHandler dashboard.Handler
|
||||
metricsExplorerHandler metricsexplorer.Handler
|
||||
gatewayHandler gateway.Handler
|
||||
fieldsHandler fields.Handler
|
||||
authzHandler authz.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
serviceAccountHandler serviceaccount.Handler
|
||||
factoryHandler factory.Handler
|
||||
config apiserver.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
router *mux.Router
|
||||
authZ *middleware.AuthZ
|
||||
orgHandler organization.Handler
|
||||
userHandler user.Handler
|
||||
sessionHandler session.Handler
|
||||
authDomainHandler authdomain.Handler
|
||||
preferenceHandler preference.Handler
|
||||
globalHandler global.Handler
|
||||
promoteHandler promote.Handler
|
||||
flaggerHandler flagger.Handler
|
||||
dashboardModule dashboard.Module
|
||||
dashboardHandler dashboard.Handler
|
||||
metricsExplorerHandler metricsexplorer.Handler
|
||||
gatewayHandler gateway.Handler
|
||||
fieldsHandler fields.Handler
|
||||
authzHandler authz.Handler
|
||||
rawDataExportHandler rawdataexport.Handler
|
||||
zeusHandler zeus.Handler
|
||||
querierHandler querier.Handler
|
||||
serviceAccountHandler serviceaccount.Handler
|
||||
factoryHandler factory.Handler
|
||||
cloudIntegrationHandler cloudintegration.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -70,10 +74,12 @@ func NewFactory(
|
||||
gatewayHandler gateway.Handler,
|
||||
fieldsHandler fields.Handler,
|
||||
authzHandler authz.Handler,
|
||||
rawDataExportHandler rawdataexport.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
serviceAccountHandler serviceaccount.Handler,
|
||||
factoryHandler factory.Handler,
|
||||
cloudIntegrationHandler cloudintegration.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -96,10 +102,12 @@ func NewFactory(
|
||||
gatewayHandler,
|
||||
fieldsHandler,
|
||||
authzHandler,
|
||||
rawDataExportHandler,
|
||||
zeusHandler,
|
||||
querierHandler,
|
||||
serviceAccountHandler,
|
||||
factoryHandler,
|
||||
cloudIntegrationHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -124,36 +132,40 @@ func newProvider(
|
||||
gatewayHandler gateway.Handler,
|
||||
fieldsHandler fields.Handler,
|
||||
authzHandler authz.Handler,
|
||||
rawDataExportHandler rawdataexport.Handler,
|
||||
zeusHandler zeus.Handler,
|
||||
querierHandler querier.Handler,
|
||||
serviceAccountHandler serviceaccount.Handler,
|
||||
factoryHandler factory.Handler,
|
||||
cloudIntegrationHandler cloudintegration.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
|
||||
provider := &provider{
|
||||
config: config,
|
||||
settings: settings,
|
||||
router: router,
|
||||
orgHandler: orgHandler,
|
||||
userHandler: userHandler,
|
||||
sessionHandler: sessionHandler,
|
||||
authDomainHandler: authDomainHandler,
|
||||
preferenceHandler: preferenceHandler,
|
||||
globalHandler: globalHandler,
|
||||
promoteHandler: promoteHandler,
|
||||
flaggerHandler: flaggerHandler,
|
||||
dashboardModule: dashboardModule,
|
||||
dashboardHandler: dashboardHandler,
|
||||
metricsExplorerHandler: metricsExplorerHandler,
|
||||
gatewayHandler: gatewayHandler,
|
||||
fieldsHandler: fieldsHandler,
|
||||
authzHandler: authzHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
serviceAccountHandler: serviceAccountHandler,
|
||||
factoryHandler: factoryHandler,
|
||||
config: config,
|
||||
settings: settings,
|
||||
router: router,
|
||||
orgHandler: orgHandler,
|
||||
userHandler: userHandler,
|
||||
sessionHandler: sessionHandler,
|
||||
authDomainHandler: authDomainHandler,
|
||||
preferenceHandler: preferenceHandler,
|
||||
globalHandler: globalHandler,
|
||||
promoteHandler: promoteHandler,
|
||||
flaggerHandler: flaggerHandler,
|
||||
dashboardModule: dashboardModule,
|
||||
dashboardHandler: dashboardHandler,
|
||||
metricsExplorerHandler: metricsExplorerHandler,
|
||||
gatewayHandler: gatewayHandler,
|
||||
fieldsHandler: fieldsHandler,
|
||||
authzHandler: authzHandler,
|
||||
rawDataExportHandler: rawDataExportHandler,
|
||||
zeusHandler: zeusHandler,
|
||||
querierHandler: querierHandler,
|
||||
serviceAccountHandler: serviceAccountHandler,
|
||||
factoryHandler: factoryHandler,
|
||||
cloudIntegrationHandler: cloudIntegrationHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -226,6 +238,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addRawDataExportRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addZeusRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -242,6 +258,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addCloudIntegrationRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
33
pkg/apiserver/signozapiserver/rawdataexport.go
Normal file
33
pkg/apiserver/signozapiserver/rawdataexport.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/exporttypes"
|
||||
v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addRawDataExportRoutes(router *mux.Router) error {
|
||||
|
||||
if err := router.Handle("/api/v1/export_raw_data", handler.New(provider.authZ.ViewAccess(provider.rawDataExportHandler.ExportRawData), handler.OpenAPIDef{
|
||||
ID: "HandleExportRawDataPOST",
|
||||
Tags: []string{"logs", "traces"},
|
||||
Summary: "Export raw data",
|
||||
Description: "This endpoints allows complex query exporting raw data for traces and logs",
|
||||
Request: new(v5.QueryRangeRequest),
|
||||
RequestQuery: new(exporttypes.ExportRawDataFormatQueryParam),
|
||||
RequestContentType: "application/json",
|
||||
Response: nil,
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
})).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -53,7 +53,7 @@ type Module interface {
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
GetConnectionArtifact(http.ResponseWriter, *http.Request)
|
||||
CreateAccount(http.ResponseWriter, *http.Request)
|
||||
ListAccounts(http.ResponseWriter, *http.Request)
|
||||
GetAccount(http.ResponseWriter, *http.Request)
|
||||
UpdateAccount(http.ResponseWriter, *http.Request)
|
||||
|
||||
58
pkg/modules/cloudintegration/implcloudintegration/handler.go
Normal file
58
pkg/modules/cloudintegration/implcloudintegration/handler.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package implcloudintegration
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
)
|
||||
|
||||
type handler struct{}
|
||||
|
||||
func NewHandler() cloudintegration.Handler {
|
||||
return &handler{}
|
||||
}
|
||||
|
||||
func (handler *handler) CreateAccount(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) ListAccounts(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) GetAccount(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) UpdateAccount(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) DisconnectAccount(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) ListServicesMetadata(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) GetService(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) UpdateService(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (handler *handler) AgentCheckIn(writer http.ResponseWriter, request *http.Request) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
@@ -6,20 +6,19 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/binding"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/exporttypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -31,129 +30,31 @@ func NewHandler(module rawdataexport.Module) rawdataexport.Handler {
|
||||
return &handler{module: module}
|
||||
}
|
||||
|
||||
// ExportRawData handles data export requests.
|
||||
//
|
||||
// API Documentation:
|
||||
// Endpoint: GET /api/v1/export_raw_data
|
||||
//
|
||||
// Query Parameters:
|
||||
//
|
||||
// - source (optional): Type of data to export ["logs" (default), "metrics", "traces"]
|
||||
// Note: Currently only "logs" is fully supported
|
||||
//
|
||||
// - format (optional): Output format ["csv" (default), "jsonl"]
|
||||
//
|
||||
// - start (required): Start time for query (Unix timestamp in nanoseconds)
|
||||
//
|
||||
// - end (required): End time for query (Unix timestamp in nanoseconds)
|
||||
//
|
||||
// - limit (optional): Maximum number of rows to export
|
||||
// Constraints: Must be positive and cannot exceed MAX_EXPORT_ROW_COUNT_LIMIT
|
||||
//
|
||||
// - filter (optional): Filter expression to apply to the query
|
||||
//
|
||||
// - columns (optional): Specific columns to include in export
|
||||
// Default: all columns are returned
|
||||
// Format: ["context.field:type", "context.field", "field"]
|
||||
//
|
||||
// - order_by (optional): Sorting specification ["column:direction" or "context.field:type:direction"]
|
||||
// Direction: "asc" or "desc"
|
||||
// Default: ["timestamp:desc", "id:desc"]
|
||||
//
|
||||
// Response Headers:
|
||||
// - Content-Type: "text/csv" or "application/x-ndjson"
|
||||
// - Content-Encoding: "gzip" (handled by HTTP middleware)
|
||||
// - Content-Disposition: "attachment; filename=\"data_exported.[format]\""
|
||||
// - Cache-Control: "no-cache"
|
||||
// - Vary: "Accept-Encoding"
|
||||
// - Transfer-Encoding: "chunked"
|
||||
// - Trailers: X-Response-Complete
|
||||
//
|
||||
// Response Format:
|
||||
//
|
||||
// CSV: Headers in first row, data in subsequent rows
|
||||
// JSONL: One JSON object per line
|
||||
//
|
||||
// Example Usage:
|
||||
//
|
||||
// Basic CSV export:
|
||||
// GET /api/v1/export_raw_data?start=1693612800000000000&end=1693699199000000000
|
||||
//
|
||||
// Export with columns and format:
|
||||
// GET /api/v1/export_raw_data?start=1693612800000000000&end=1693699199000000000&format=jsonl
|
||||
// &columns=timestamp&columns=severity&columns=message
|
||||
//
|
||||
// Export with filter and ordering:
|
||||
// GET /api/v1/export_raw_data?start=1693612800000000000&end=1693699199000000000
|
||||
// &filter=severity="error"&order_by=timestamp:desc&limit=1000
|
||||
func (handler *handler) ExportRawData(rw http.ResponseWriter, r *http.Request) {
|
||||
source, err := getExportQuerySource(r.URL.Query())
|
||||
if err != nil {
|
||||
var queryRangeRequest qbtypes.QueryRangeRequest
|
||||
|
||||
var formatParam exporttypes.ExportRawDataFormatQueryParam
|
||||
if err := binding.Query.BindQuery(r.URL.Query(), &formatParam); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
format := formatParam.Format
|
||||
if err := binding.JSON.BindBody(r.Body, &queryRangeRequest); err != nil {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := validateSpecForExport(&queryRangeRequest); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch source {
|
||||
case "logs":
|
||||
handler.exportLogs(rw, r)
|
||||
case "traces":
|
||||
handler.exportTraces(rw, r)
|
||||
case "metrics":
|
||||
handler.exportMetrics(rw, r)
|
||||
default:
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid source: must be logs"))
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) exportMetrics(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "metrics export is not yet supported"))
|
||||
}
|
||||
|
||||
func (handler *handler) exportTraces(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "traces export is not yet supported"))
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
// Set up response headers
|
||||
rw.Header().Set("Cache-Control", "no-cache")
|
||||
rw.Header().Set("Vary", "Accept-Encoding") // Indicate that response varies based on Accept-Encoding
|
||||
rw.Header().Set("Access-Control-Expose-Headers", "Content-Disposition, X-Response-Complete")
|
||||
rw.Header().Set("Trailer", "X-Response-Complete")
|
||||
rw.Header().Set("Transfer-Encoding", "chunked")
|
||||
|
||||
queryParams := r.URL.Query()
|
||||
|
||||
startTime, endTime, err := getExportQueryTimeRange(queryParams)
|
||||
if err != nil {
|
||||
if err := validateAndApplyDefaultExportLimits(queryRangeRequest.CompositeQuery.Queries); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
limit, err := getExportQueryLimit(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
format, err := getExportQueryFormat(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Set appropriate content type and filename
|
||||
filename := fmt.Sprintf("data_exported_%s.%s", time.Now().Format("2006-01-02_150405"), format)
|
||||
rw.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
|
||||
filterExpression := queryParams.Get("filter")
|
||||
|
||||
orderByExpression, err := getExportQueryOrderBy(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
columns := getExportQueryColumns(queryParams)
|
||||
queryRangeRequest.UseDefaultOrderBy()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
@@ -161,76 +62,98 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "orgID is invalid"))
|
||||
return
|
||||
}
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
queryRangeRequest := qbtypes.QueryRangeRequest{
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
RequestType: qbtypes.RequestTypeRaw,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
setExportResponseHeaders(rw, format)
|
||||
|
||||
spec := qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Name: "raw",
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: filterExpression,
|
||||
},
|
||||
Limit: limit,
|
||||
Order: orderByExpression,
|
||||
}
|
||||
|
||||
spec.SelectFields = columns
|
||||
|
||||
queryRangeRequest.CompositeQuery.Queries[0].Spec = spec
|
||||
|
||||
// This will signal Export module to stop sending data
|
||||
doneChan := make(chan any)
|
||||
defer close(doneChan)
|
||||
rowChan, errChan := handler.module.ExportRawData(r.Context(), orgID, &queryRangeRequest, doneChan)
|
||||
|
||||
var isComplete bool
|
||||
isComplete, err := handler.executeExport(rowChan, errChan, format, rw)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
|
||||
}
|
||||
|
||||
// validateSpecForExport validates query specs
|
||||
func validateSpecForExport(req *qbtypes.QueryRangeRequest) error {
|
||||
|
||||
queries := req.CompositeQuery.Queries
|
||||
|
||||
// If the trace operator query is not present, and there are multiple queries, return an error
|
||||
if req.TraceOperatorQueryIndex() == -1 && len(queries) > 1 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "multiple queries not allowed without a trace operator query")
|
||||
}
|
||||
|
||||
for idx := range queries {
|
||||
switch spec := queries[idx].Spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
||||
qbtypes.QueryBuilderTraceOperator:
|
||||
// Supported spec types
|
||||
default:
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query at index %d type: %T", idx, spec)
|
||||
}
|
||||
}
|
||||
|
||||
opts := append(qbtypes.GetValidationOptions(req.RequestType), qbtypes.WithSkipLimitOffsetValidation())
|
||||
return req.Validate(opts...)
|
||||
}
|
||||
|
||||
func validateAndApplyDefaultExportLimits(queries []qbtypes.QueryEnvelope) error {
|
||||
for idx := range queries {
|
||||
limit := queries[idx].GetLimit()
|
||||
if limit == 0 {
|
||||
limit = DefaultExportRowCountLimit
|
||||
} else if limit < 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be positive")
|
||||
} else if limit > MaxExportRowCountLimit {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit cannot be more than %d", MaxExportRowCountLimit)
|
||||
}
|
||||
queries[idx].SetLimit(limit)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setExportResponseHeaders sets common HTTP headers for export responses.
|
||||
func setExportResponseHeaders(rw http.ResponseWriter, format string) {
|
||||
rw.Header().Set("Cache-Control", "no-cache")
|
||||
rw.Header().Set("Vary", "Accept-Encoding")
|
||||
rw.Header().Set("Access-Control-Expose-Headers", "Content-Disposition, X-Response-Complete")
|
||||
rw.Header().Set("Trailer", "X-Response-Complete")
|
||||
rw.Header().Set("Transfer-Encoding", "chunked")
|
||||
filename := fmt.Sprintf("data_exported_%s.%s", time.Now().Format("2006-01-02_150405"), format)
|
||||
rw.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
}
|
||||
|
||||
// executeExport streams data from rowChan to the response writer in the specified format.
|
||||
func (handler *handler) executeExport(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, format string, rw http.ResponseWriter) (bool, error) {
|
||||
switch format {
|
||||
case "csv", "":
|
||||
rw.Header().Set("Content-Type", "text/csv")
|
||||
csvWriter := csv.NewWriter(rw)
|
||||
isComplete, err = handler.exportLogsCSV(rowChan, errChan, csvWriter)
|
||||
isComplete, err := handler.exportRawDataCSV(rowChan, errChan, csvWriter)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
return false, err
|
||||
}
|
||||
csvWriter.Flush()
|
||||
return isComplete, nil
|
||||
case "jsonl":
|
||||
rw.Header().Set("Content-Type", "application/x-ndjson")
|
||||
isComplete, err = handler.exportLogsJSONL(rowChan, errChan, rw)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
return handler.exportRawDataJSONL(rowChan, errChan, rw)
|
||||
default:
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid format: must be csv or jsonl"))
|
||||
return
|
||||
return false, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid format: must be csv or jsonl")
|
||||
}
|
||||
|
||||
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
|
||||
var header []string
|
||||
// exportRawDataCSV is a generic CSV export function that works with any raw data (logs, traces, etc.)
|
||||
func (handler *handler) exportRawDataCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
|
||||
|
||||
headerToIndexMapping := make(map[string]int, len(header))
|
||||
var header []string
|
||||
headerToIndexMapping := make(map[string]int)
|
||||
|
||||
totalBytes := uint64(0)
|
||||
for {
|
||||
@@ -268,8 +191,8 @@ func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogsJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
|
||||
|
||||
// exportRawDataJSONL is a generic JSONL export function that works with any raw data (logs, traces, etc.)
|
||||
func (handler *handler) exportRawDataJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
|
||||
totalBytes := uint64(0)
|
||||
for {
|
||||
select {
|
||||
@@ -277,9 +200,11 @@ func (handler *handler) exportLogsJSONL(rowChan <-chan *qbtypes.RawRow, errChan
|
||||
if !ok {
|
||||
return true, nil
|
||||
}
|
||||
// Handle JSON format (JSONL - one object per line)
|
||||
jsonBytes, _ := json.Marshal(row.Data)
|
||||
totalBytes += uint64(len(jsonBytes)) + 1 // +1 for newline
|
||||
jsonBytes, err := json.Marshal(row.Data)
|
||||
if err != nil {
|
||||
return false, errors.NewUnexpectedf(errors.CodeInternal, "error marshaling JSON: %s", err)
|
||||
}
|
||||
totalBytes += uint64(len(jsonBytes)) + 1
|
||||
|
||||
if _, err := writer.Write(jsonBytes); err != nil {
|
||||
return false, errors.NewUnexpectedf(errors.CodeInternal, "error writing JSON: %s", err)
|
||||
@@ -299,74 +224,33 @@ func (handler *handler) exportLogsJSONL(rowChan <-chan *qbtypes.RawRow, errChan
|
||||
}
|
||||
}
|
||||
|
||||
func getExportQuerySource(queryParams url.Values) (string, error) {
|
||||
switch queryParams.Get("source") {
|
||||
case "logs", "":
|
||||
return "logs", nil
|
||||
case "metrics":
|
||||
return "metrics", errors.NewInvalidInputf(errors.CodeInvalidInput, "metrics export not yet supported")
|
||||
case "traces":
|
||||
return "traces", errors.NewInvalidInputf(errors.CodeInvalidInput, "traces export not yet supported")
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid source: must be logs, metrics or traces")
|
||||
}
|
||||
}
|
||||
|
||||
func getExportQueryFormat(queryParams url.Values) (string, error) {
|
||||
switch queryParams.Get("format") {
|
||||
case "csv", "":
|
||||
return "csv", nil
|
||||
case "jsonl":
|
||||
return "jsonl", nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid format: must be csv or jsonl")
|
||||
}
|
||||
}
|
||||
|
||||
func getExportQueryLimit(queryParams url.Values) (int, error) {
|
||||
|
||||
limitStr := queryParams.Get("limit")
|
||||
if limitStr == "" {
|
||||
return DefaultExportRowCountLimit, nil
|
||||
} else {
|
||||
limit, err := strconv.Atoi(limitStr)
|
||||
if err != nil {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid limit format: %s", err.Error())
|
||||
}
|
||||
if limit <= 0 {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be positive")
|
||||
}
|
||||
if limit > MaxExportRowCountLimit {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "limit cannot be more than %d", MaxExportRowCountLimit)
|
||||
}
|
||||
return limit, nil
|
||||
}
|
||||
}
|
||||
|
||||
func getExportQueryTimeRange(queryParams url.Values) (uint64, uint64, error) {
|
||||
|
||||
startTimeStr := queryParams.Get("start")
|
||||
endTimeStr := queryParams.Get("end")
|
||||
|
||||
if startTimeStr == "" || endTimeStr == "" {
|
||||
return 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "start and end time are required")
|
||||
}
|
||||
startTime, err := strconv.ParseUint(startTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid start time format: %s", err.Error())
|
||||
}
|
||||
endTime, err := strconv.ParseUint(endTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid end time format: %s", err.Error())
|
||||
}
|
||||
return startTime, endTime, nil
|
||||
}
|
||||
// priorityColumns defines the columns that should appear first in the CSV output, in order.
|
||||
var priorityColumns = []string{"timestamp", "id"}
|
||||
|
||||
func constructCSVHeaderFromQueryResponse(data map[string]any) []string {
|
||||
header := make([]string, 0, len(data))
|
||||
for key := range data {
|
||||
header = append(header, key)
|
||||
}
|
||||
// This is to ensure CSV output is consistent across multiple queries
|
||||
slices.SortFunc(header, func(a, b string) int {
|
||||
ai, bi := slices.Index(priorityColumns, a), slices.Index(priorityColumns, b)
|
||||
switch {
|
||||
case ai != -1 && bi != -1:
|
||||
return ai - bi
|
||||
case ai != -1:
|
||||
return -1
|
||||
case bi != -1:
|
||||
return 1
|
||||
default:
|
||||
if a < b {
|
||||
return -1
|
||||
} else if a > b {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
})
|
||||
return header
|
||||
}
|
||||
|
||||
@@ -427,9 +311,12 @@ func constructCSVRecordFromQueryResponse(data map[string]any, headerToIndexMappi
|
||||
valueStr = v.String()
|
||||
|
||||
default:
|
||||
// For all other complex types (maps, structs, etc.)
|
||||
jsonBytes, _ := json.Marshal(v)
|
||||
valueStr = string(jsonBytes)
|
||||
jsonBytes, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
valueStr = fmt.Sprintf("%v", v)
|
||||
} else {
|
||||
valueStr = string(jsonBytes)
|
||||
}
|
||||
}
|
||||
|
||||
record[index] = sanitizeForCSV(valueStr)
|
||||
@@ -438,26 +325,6 @@ func constructCSVRecordFromQueryResponse(data map[string]any, headerToIndexMappi
|
||||
return record
|
||||
}
|
||||
|
||||
// getExportQueryColumns parses the "columns" query parameters and returns a slice of TelemetryFieldKey structs.
|
||||
// Each column should be a valid telemetry field key in the format "context.field:type" or "context.field" or "field"
|
||||
func getExportQueryColumns(queryParams url.Values) []telemetrytypes.TelemetryFieldKey {
|
||||
columnParams := queryParams["columns"]
|
||||
|
||||
columns := make([]telemetrytypes.TelemetryFieldKey, 0, len(columnParams))
|
||||
|
||||
for _, columnStr := range columnParams {
|
||||
// Skip empty strings
|
||||
columnStr = strings.TrimSpace(columnStr)
|
||||
if columnStr == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
columns = append(columns, telemetrytypes.GetFieldKeyFromKeyText(columnStr))
|
||||
}
|
||||
|
||||
return columns
|
||||
}
|
||||
|
||||
func getsizeOfStringSlice(slice []string) uint64 {
|
||||
var totalBytes uint64
|
||||
for _, str := range slice {
|
||||
@@ -465,52 +332,3 @@ func getsizeOfStringSlice(slice []string) uint64 {
|
||||
}
|
||||
return totalBytes
|
||||
}
|
||||
|
||||
// getExportQueryOrderBy parses the "order_by" query parameters and returns a slice of OrderBy structs.
|
||||
// Each "order_by" parameter should be in the format "column:direction"
|
||||
// Each "column" should be a valid telemetry field key in the format "context.field:type" or "context.field" or "field"
|
||||
func getExportQueryOrderBy(queryParams url.Values) ([]qbtypes.OrderBy, error) {
|
||||
orderByParam := queryParams.Get("order_by")
|
||||
|
||||
orderByParam = strings.TrimSpace(orderByParam)
|
||||
if orderByParam == "" {
|
||||
return telemetrylogs.DefaultLogsV2SortingOrder, nil
|
||||
}
|
||||
|
||||
parts := strings.Split(orderByParam, ":")
|
||||
if len(parts) != 2 && len(parts) != 3 {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order_by format: %s, should be <column>:<direction>", orderByParam)
|
||||
}
|
||||
|
||||
column := strings.Join(parts[:len(parts)-1], ":")
|
||||
direction := parts[len(parts)-1]
|
||||
|
||||
orderDirection, ok := qbtypes.OrderDirectionMap[direction]
|
||||
if !ok {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order_by direction: %s, should be one of %s, %s", direction, qbtypes.OrderDirectionAsc, qbtypes.OrderDirectionDesc)
|
||||
}
|
||||
|
||||
orderByKey := telemetrytypes.GetFieldKeyFromKeyText(column)
|
||||
|
||||
orderBy := []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: orderByKey,
|
||||
},
|
||||
Direction: orderDirection,
|
||||
},
|
||||
}
|
||||
|
||||
// If we are ordering by the timestamp column, also order by the ID column
|
||||
if orderByKey.Name == telemetrylogs.LogsV2TimestampColumn {
|
||||
orderBy = append(orderBy, qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
Direction: orderDirection,
|
||||
})
|
||||
}
|
||||
return orderBy, nil
|
||||
}
|
||||
|
||||
@@ -2,162 +2,84 @@ package implrawdataexport
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/http/binding"
|
||||
"github.com/SigNoz/signoz/pkg/types/exporttypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetExportQuerySource(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedSource string
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "default logs source",
|
||||
queryParams: url.Values{},
|
||||
expectedSource: "logs",
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "explicit logs source",
|
||||
queryParams: url.Values{"source": {"logs"}},
|
||||
expectedSource: "logs",
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "metrics source - not supported",
|
||||
queryParams: url.Values{"source": {"metrics"}},
|
||||
expectedSource: "metrics",
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "traces source - not supported",
|
||||
queryParams: url.Values{"source": {"traces"}},
|
||||
expectedSource: "traces",
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid source",
|
||||
queryParams: url.Values{"source": {"invalid"}},
|
||||
expectedSource: "",
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
func TestExportRawDataFormatQueryParam_BindingDefaults(t *testing.T) {
|
||||
var params exporttypes.ExportRawDataFormatQueryParam
|
||||
err := binding.Query.BindQuery(url.Values{}, ¶ms)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "csv", params.Format)
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
source, err := getExportQuerySource(tt.queryParams)
|
||||
assert.Equal(t, tt.expectedSource, source)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
func logQuery(limit int) qbtypes.QueryEnvelope {
|
||||
return qbtypes.QueryEnvelope{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{Limit: limit},
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryFormat(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedFormat string
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "default csv format",
|
||||
queryParams: url.Values{},
|
||||
expectedFormat: "csv",
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "explicit csv format",
|
||||
queryParams: url.Values{"format": {"csv"}},
|
||||
expectedFormat: "csv",
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "jsonl format",
|
||||
queryParams: url.Values{"format": {"jsonl"}},
|
||||
expectedFormat: "jsonl",
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "invalid format",
|
||||
queryParams: url.Values{"format": {"xml"}},
|
||||
expectedFormat: "",
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
format, err := getExportQueryFormat(tt.queryParams)
|
||||
assert.Equal(t, tt.expectedFormat, format)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
func traceQuery(limit int) qbtypes.QueryEnvelope {
|
||||
return qbtypes.QueryEnvelope{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{Limit: limit},
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryLimit(t *testing.T) {
|
||||
func traceOperatorQuery(limit int) qbtypes.QueryEnvelope {
|
||||
return qbtypes.QueryEnvelope{
|
||||
Type: qbtypes.QueryTypeTraceOperator,
|
||||
Spec: qbtypes.QueryBuilderTraceOperator{Limit: limit, Expression: "A"},
|
||||
}
|
||||
}
|
||||
|
||||
func makeRequest(queries ...qbtypes.QueryEnvelope) qbtypes.QueryRangeRequest {
|
||||
return qbtypes.QueryRangeRequest{
|
||||
Start: 1000000000000,
|
||||
End: 1000003600000,
|
||||
RequestType: qbtypes.RequestTypeRaw,
|
||||
CompositeQuery: qbtypes.CompositeQuery{Queries: queries},
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSpecForExport(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedLimit int
|
||||
req qbtypes.QueryRangeRequest
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "default limit",
|
||||
queryParams: url.Values{},
|
||||
expectedLimit: DefaultExportRowCountLimit,
|
||||
expectedError: false,
|
||||
name: "single log query",
|
||||
req: makeRequest(logQuery(0)),
|
||||
},
|
||||
{
|
||||
name: "valid limit",
|
||||
queryParams: url.Values{"limit": {"5000"}},
|
||||
expectedLimit: 5000,
|
||||
expectedError: false,
|
||||
name: "single trace query",
|
||||
req: makeRequest(traceQuery(0)),
|
||||
},
|
||||
{
|
||||
name: "maximum limit",
|
||||
queryParams: url.Values{"limit": {strconv.Itoa(MaxExportRowCountLimit)}},
|
||||
expectedLimit: MaxExportRowCountLimit,
|
||||
expectedError: false,
|
||||
name: "trace operator alone",
|
||||
req: makeRequest(traceOperatorQuery(0)),
|
||||
},
|
||||
{
|
||||
name: "limit exceeds maximum",
|
||||
queryParams: url.Values{"limit": {"100000"}},
|
||||
expectedLimit: 0,
|
||||
name: "multiple queries without trace operator",
|
||||
req: makeRequest(logQuery(0), traceQuery(0)),
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid limit format",
|
||||
queryParams: url.Values{"limit": {"invalid"}},
|
||||
expectedLimit: 0,
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "negative limit",
|
||||
queryParams: url.Values{"limit": {"-100"}},
|
||||
expectedLimit: 0,
|
||||
name: "unsupported query type",
|
||||
req: makeRequest(qbtypes.QueryEnvelope{Type: qbtypes.QueryTypeBuilder, Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{}}),
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
limit, err := getExportQueryLimit(tt.queryParams)
|
||||
assert.Equal(t, tt.expectedLimit, limit)
|
||||
err := validateSpecForExport(&tt.req)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
@@ -167,352 +89,69 @@ func TestGetExportQueryLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryTimeRange(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedStartTime uint64
|
||||
expectedEndTime uint64
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "valid time range",
|
||||
queryParams: url.Values{
|
||||
"start": {"1640995200"},
|
||||
"end": {"1641081600"},
|
||||
},
|
||||
expectedStartTime: 1640995200,
|
||||
expectedEndTime: 1641081600,
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "missing start time",
|
||||
queryParams: url.Values{"end": {"1641081600"}},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "missing end time",
|
||||
queryParams: url.Values{"start": {"1640995200"}},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "missing both times",
|
||||
queryParams: url.Values{},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid start time format",
|
||||
queryParams: url.Values{
|
||||
"start": {"invalid"},
|
||||
"end": {"1641081600"},
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid end time format",
|
||||
queryParams: url.Values{
|
||||
"start": {"1640995200"},
|
||||
"end": {"invalid"},
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
startTime, endTime, err := getExportQueryTimeRange(tt.queryParams)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedStartTime, startTime)
|
||||
assert.Equal(t, tt.expectedEndTime, endTime)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryColumns(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedColumns []telemetrytypes.TelemetryFieldKey
|
||||
}{
|
||||
{
|
||||
name: "no columns specified",
|
||||
queryParams: url.Values{},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{},
|
||||
},
|
||||
{
|
||||
name: "single column",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple columns",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp", "message", "level"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
{Name: "message"},
|
||||
{Name: "level"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty column name (should be skipped)",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp", "", "level"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
{Name: "level"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "whitespace column name (should be skipped)",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp", " ", "level"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
{Name: "level"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid column name with data type",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp", "attribute.user:string", "level"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
{Name: "user", FieldContext: telemetrytypes.FieldContextAttribute, FieldDataType: telemetrytypes.FieldDataTypeString},
|
||||
{Name: "level"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid column name with dot notation",
|
||||
queryParams: url.Values{
|
||||
"columns": {"timestamp", "attribute.user.string", "level"},
|
||||
},
|
||||
expectedColumns: []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: "timestamp"},
|
||||
{Name: "user.string", FieldContext: telemetrytypes.FieldContextAttribute},
|
||||
{Name: "level"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
columns := getExportQueryColumns(tt.queryParams)
|
||||
assert.Equal(t, len(tt.expectedColumns), len(columns))
|
||||
for i, expectedCol := range tt.expectedColumns {
|
||||
assert.Equal(t, expectedCol, columns[i])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryOrderBy(t *testing.T) {
|
||||
func TestValidateAndApplyDefaultExportLimits(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedOrder []qbtypes.OrderBy
|
||||
queries []qbtypes.QueryEnvelope
|
||||
expectedError bool
|
||||
checkQueries func(t *testing.T, queries []qbtypes.QueryEnvelope)
|
||||
}{
|
||||
{
|
||||
name: "no order specified",
|
||||
queryParams: url.Values{},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2TimestampColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
name: "single log query, zero limit gets default",
|
||||
queries: makeRequest(logQuery(0)).CompositeQuery.Queries,
|
||||
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
|
||||
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "single order error, direction not specified",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp"},
|
||||
name: "single log query, valid limit kept",
|
||||
queries: makeRequest(logQuery(1000)).CompositeQuery.Queries,
|
||||
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
|
||||
assert.Equal(t, 1000, q[0].GetLimit())
|
||||
},
|
||||
expectedOrder: nil,
|
||||
},
|
||||
{
|
||||
name: "single log query, max limit kept",
|
||||
queries: makeRequest(logQuery(MaxExportRowCountLimit)).CompositeQuery.Queries,
|
||||
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
|
||||
assert.Equal(t, MaxExportRowCountLimit, q[0].GetLimit())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "single log query, limit exceeds max",
|
||||
queries: makeRequest(logQuery(MaxExportRowCountLimit + 1)).CompositeQuery.Queries,
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "single order no error",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2TimestampColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "multiple orders",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:asc", "body:desc", "id:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2TimestampColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "empty order name (should be skipped)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:asc", "", "id:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2TimestampColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "whitespace order name (should be skipped)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:asc", " ", "id:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2TimestampColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "invalid order name (should error out)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"attributes.user:", "id:asc"},
|
||||
},
|
||||
expectedOrder: nil,
|
||||
name: "single log query, negative limit",
|
||||
queries: makeRequest(logQuery(-1)).CompositeQuery.Queries,
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "valid order name (should be included)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"attribute.user:string:desc", "id:asc"},
|
||||
name: "single trace query, zero limit gets default",
|
||||
queries: makeRequest(traceQuery(0)).CompositeQuery.Queries,
|
||||
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
|
||||
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "valid order name (should be included)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"attribute.user.string:desc", "id:asc"},
|
||||
name: "trace operator alone, zero limit gets default",
|
||||
queries: makeRequest(traceOperatorQuery(0)).CompositeQuery.Queries,
|
||||
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
|
||||
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.string",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
order, err := getExportQueryOrderBy(tt.queryParams)
|
||||
err := validateAndApplyDefaultExportLimits(tt.queries)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(tt.expectedOrder), len(order))
|
||||
for i, expectedOrd := range tt.expectedOrder {
|
||||
assert.Equal(t, expectedOrd, order[i])
|
||||
if tt.checkQueries != nil {
|
||||
tt.checkQueries(t, tt.queries)
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -529,13 +168,8 @@ func TestConstructCSVHeaderFromQueryResponse(t *testing.T) {
|
||||
|
||||
header := constructCSVHeaderFromQueryResponse(data)
|
||||
|
||||
// Since map iteration order is not guaranteed, check that all expected keys are present
|
||||
expectedKeys := []string{"timestamp", "message", "level", "id"}
|
||||
assert.Equal(t, len(expectedKeys), len(header))
|
||||
|
||||
for _, key := range expectedKeys {
|
||||
assert.Contains(t, header, key)
|
||||
}
|
||||
// Priority columns come first in order, then the rest alphabetically.
|
||||
assert.Equal(t, []string{"timestamp", "id", "level", "message"}, header)
|
||||
}
|
||||
|
||||
func TestConstructCSVRecordFromQueryResponse(t *testing.T) {
|
||||
|
||||
@@ -28,8 +28,18 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
|
||||
instrumentationtypes.CodeFunctionName: "ExportRawData",
|
||||
})
|
||||
|
||||
spec := rangeRequest.CompositeQuery.Queries[0].Spec.(qbtypes.QueryBuilderQuery[qbtypes.LogAggregation])
|
||||
rowCountLimit := spec.Limit
|
||||
traceOperatorQueryIndex := rangeRequest.TraceOperatorQueryIndex()
|
||||
|
||||
queries := rangeRequest.CompositeQuery.Queries
|
||||
|
||||
// If the trace operator query is present, mark the queries other than trace operator as disabled
|
||||
if traceOperatorQueryIndex > -1 {
|
||||
for idx := range len(queries) {
|
||||
if idx != traceOperatorQueryIndex {
|
||||
queries[idx].SetDisabled(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rowChan := make(chan *qbtypes.RawRow, 1)
|
||||
errChan := make(chan error, 1)
|
||||
@@ -43,52 +53,62 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
|
||||
defer close(errChan)
|
||||
defer close(rowChan)
|
||||
|
||||
rowCount := 0
|
||||
|
||||
for rowCount < rowCountLimit {
|
||||
spec.Limit = min(ChunkSize, rowCountLimit-rowCount)
|
||||
spec.Offset = rowCount
|
||||
|
||||
rangeRequest.CompositeQuery.Queries[0].Spec = spec
|
||||
|
||||
response, err := m.querier.QueryRange(contextWithTimeout, orgID, rangeRequest)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount := 0
|
||||
for _, result := range response.Data.Results {
|
||||
resultData, ok := result.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "expected RawData, got %T", result)
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount += len(resultData.Rows)
|
||||
for _, row := range resultData.Rows {
|
||||
select {
|
||||
case rowChan <- row:
|
||||
case <-doneChan:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Break if we did not receive any new rows
|
||||
if newRowsCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rowCount += newRowsCount
|
||||
|
||||
if traceOperatorQueryIndex > -1 {
|
||||
// If the trace operator query is present, we need to export the data for the trace operator query only
|
||||
exportRawDataForSingleQuery(m.querier, contextWithTimeout, orgID, rangeRequest, rowChan, errChan, doneChan, traceOperatorQueryIndex)
|
||||
} else {
|
||||
// If the trace operator query is not present, we need to export the data for the first query only
|
||||
exportRawDataForSingleQuery(m.querier, contextWithTimeout, orgID, rangeRequest, rowChan, errChan, doneChan, 0)
|
||||
}
|
||||
}()
|
||||
|
||||
return rowChan, errChan
|
||||
|
||||
}
|
||||
|
||||
func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, rowChan chan *qbtypes.RawRow, errChan chan error, doneChan chan any, queryIndex int) {
|
||||
|
||||
queries := rangeRequest.CompositeQuery.Queries
|
||||
rowCountLimit := queries[queryIndex].GetLimit()
|
||||
rowCount := 0
|
||||
|
||||
for rowCount < rowCountLimit {
|
||||
chunkSize := min(ChunkSize, rowCountLimit-rowCount)
|
||||
queries[queryIndex].SetLimit(chunkSize)
|
||||
queries[queryIndex].SetOffset(rowCount)
|
||||
|
||||
response, err := querier.QueryRange(ctx, orgID, rangeRequest)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount := 0
|
||||
for _, result := range response.Data.Results {
|
||||
resultData, ok := result.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "expected RawData, got %T", result)
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount += len(resultData.Rows)
|
||||
for _, row := range resultData.Rows {
|
||||
select {
|
||||
case rowChan <- row:
|
||||
case <-doneChan:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rowCount += newRowsCount
|
||||
|
||||
// Stop if we received fewer rows than requested — no more data available
|
||||
if newRowsCount < chunkSize {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
@@ -18,6 +20,7 @@ import (
|
||||
)
|
||||
|
||||
type builderQuery[T any] struct {
|
||||
logger *slog.Logger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
stmtBuilder qbtypes.StatementBuilder[T]
|
||||
spec qbtypes.QueryBuilderQuery[T]
|
||||
@@ -31,6 +34,7 @@ type builderQuery[T any] struct {
|
||||
var _ qbtypes.Query = (*builderQuery[any])(nil)
|
||||
|
||||
func newBuilderQuery[T any](
|
||||
logger *slog.Logger,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
stmtBuilder qbtypes.StatementBuilder[T],
|
||||
spec qbtypes.QueryBuilderQuery[T],
|
||||
@@ -39,6 +43,7 @@ func newBuilderQuery[T any](
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) *builderQuery[T] {
|
||||
return &builderQuery[T]{
|
||||
logger: logger,
|
||||
telemetryStore: telemetryStore,
|
||||
stmtBuilder: stmtBuilder,
|
||||
spec: spec,
|
||||
@@ -305,6 +310,45 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
|
||||
totalBytes := uint64(0)
|
||||
start := time.Now()
|
||||
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces &&
|
||||
q.spec.Filter != nil && q.spec.Filter.Expression != "" {
|
||||
|
||||
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
|
||||
if found && len(traceIDs) > 0 {
|
||||
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
|
||||
|
||||
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
|
||||
traceStartMS := uint64(traceStart) / 1_000_000
|
||||
traceEndMS := uint64(traceEnd) / 1_000_000
|
||||
if !ok {
|
||||
q.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
|
||||
} else if traceStartMS > 0 && traceEndMS > 0 {
|
||||
// no overlap — nothing to return
|
||||
if uint64(traceStartMS) > toMS || uint64(traceEndMS) < fromMS {
|
||||
return &qbtypes.Result{
|
||||
Type: qbtypes.RequestTypeRaw,
|
||||
Value: &qbtypes.RawData{
|
||||
QueryName: q.spec.Name,
|
||||
},
|
||||
Stats: qbtypes.ExecStats{
|
||||
DurationMS: uint64(time.Since(start).Milliseconds()),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// clamp window to trace time range before bucketing
|
||||
if uint64(traceStartMS) > fromMS {
|
||||
fromMS = uint64(traceStartMS)
|
||||
}
|
||||
if uint64(traceEndMS) < toMS {
|
||||
toMS = uint64(traceEndMS)
|
||||
}
|
||||
q.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", fromMS), slog.Uint64("end", toMS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get buckets and reverse them for ascending order
|
||||
buckets := makeBuckets(fromMS, toMS)
|
||||
if isAsc {
|
||||
|
||||
@@ -353,13 +353,13 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -397,9 +397,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
|
||||
if spec.Source == telemetrytypes.SourceMeter {
|
||||
event.Source = telemetrytypes.SourceMeter.StringValue()
|
||||
bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
} else {
|
||||
bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
}
|
||||
|
||||
queries[spec.Name] = bq
|
||||
@@ -509,7 +509,7 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
|
||||
case <-tick:
|
||||
// timestamp end is not specified here
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: tsStart}, req.RequestType)
|
||||
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
|
||||
"id": {
|
||||
Value: updatedLogID,
|
||||
},
|
||||
@@ -801,22 +801,22 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
|
||||
case *builderQuery[qbtypes.LogAggregation]:
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
|
||||
case *builderQuery[qbtypes.MetricAggregation]:
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
if qt.spec.Source == telemetrytypes.SourceMeter {
|
||||
return newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
}
|
||||
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
case *traceOperatorQuery:
|
||||
specCopy := qt.spec.Copy()
|
||||
return &traceOperatorQuery{
|
||||
|
||||
@@ -576,9 +576,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||
aH.LicensingAPI.Activate(rw, req)
|
||||
})).Methods(http.MethodGet)
|
||||
|
||||
// Export
|
||||
router.HandleFunc("/api/v1/export_raw_data", am.ViewAccess(aH.Signoz.Handlers.RawDataExport.ExportRawData)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/span_percentile", am.ViewAccess(aH.Signoz.Handlers.SpanPercentile.GetSpanPercentileDetails)).Methods(http.MethodPost)
|
||||
|
||||
// Query Filter Analyzer api used to extract metric names and grouping columns from a query
|
||||
|
||||
@@ -12,6 +12,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/modules/apdex"
|
||||
"github.com/SigNoz/signoz/pkg/modules/apdex/implapdex"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
@@ -38,24 +40,25 @@ import (
|
||||
)
|
||||
|
||||
type Handlers struct {
|
||||
SavedView savedview.Handler
|
||||
Apdex apdex.Handler
|
||||
Dashboard dashboard.Handler
|
||||
QuickFilter quickfilter.Handler
|
||||
TraceFunnel tracefunnel.Handler
|
||||
RawDataExport rawdataexport.Handler
|
||||
SpanPercentile spanpercentile.Handler
|
||||
Services services.Handler
|
||||
MetricsExplorer metricsexplorer.Handler
|
||||
Global global.Handler
|
||||
FlaggerHandler flagger.Handler
|
||||
GatewayHandler gateway.Handler
|
||||
Fields fields.Handler
|
||||
AuthzHandler authz.Handler
|
||||
ZeusHandler zeus.Handler
|
||||
QuerierHandler querier.Handler
|
||||
ServiceAccountHandler serviceaccount.Handler
|
||||
RegistryHandler factory.Handler
|
||||
SavedView savedview.Handler
|
||||
Apdex apdex.Handler
|
||||
Dashboard dashboard.Handler
|
||||
QuickFilter quickfilter.Handler
|
||||
TraceFunnel tracefunnel.Handler
|
||||
RawDataExport rawdataexport.Handler
|
||||
SpanPercentile spanpercentile.Handler
|
||||
Services services.Handler
|
||||
MetricsExplorer metricsexplorer.Handler
|
||||
Global global.Handler
|
||||
FlaggerHandler flagger.Handler
|
||||
GatewayHandler gateway.Handler
|
||||
Fields fields.Handler
|
||||
AuthzHandler authz.Handler
|
||||
ZeusHandler zeus.Handler
|
||||
QuerierHandler querier.Handler
|
||||
ServiceAccountHandler serviceaccount.Handler
|
||||
RegistryHandler factory.Handler
|
||||
CloudIntegrationHandler cloudintegration.Handler
|
||||
}
|
||||
|
||||
func NewHandlers(
|
||||
@@ -73,23 +76,24 @@ func NewHandlers(
|
||||
registryHandler factory.Handler,
|
||||
) Handlers {
|
||||
return Handlers{
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
Apdex: implapdex.NewHandler(modules.Apdex),
|
||||
Dashboard: impldashboard.NewHandler(modules.Dashboard, providerSettings),
|
||||
QuickFilter: implquickfilter.NewHandler(modules.QuickFilter),
|
||||
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
|
||||
RawDataExport: implrawdataexport.NewHandler(modules.RawDataExport),
|
||||
Services: implservices.NewHandler(modules.Services),
|
||||
MetricsExplorer: implmetricsexplorer.NewHandler(modules.MetricsExplorer),
|
||||
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
|
||||
Global: signozglobal.NewHandler(global),
|
||||
FlaggerHandler: flagger.NewHandler(flaggerService),
|
||||
GatewayHandler: gateway.NewHandler(gatewayService),
|
||||
Fields: implfields.NewHandler(providerSettings, telemetryMetadataStore),
|
||||
AuthzHandler: signozauthzapi.NewHandler(authz),
|
||||
ZeusHandler: zeus.NewHandler(zeusService, licensing),
|
||||
QuerierHandler: querierHandler,
|
||||
ServiceAccountHandler: implserviceaccount.NewHandler(modules.ServiceAccount),
|
||||
RegistryHandler: registryHandler,
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
Apdex: implapdex.NewHandler(modules.Apdex),
|
||||
Dashboard: impldashboard.NewHandler(modules.Dashboard, providerSettings),
|
||||
QuickFilter: implquickfilter.NewHandler(modules.QuickFilter),
|
||||
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
|
||||
RawDataExport: implrawdataexport.NewHandler(modules.RawDataExport),
|
||||
Services: implservices.NewHandler(modules.Services),
|
||||
MetricsExplorer: implmetricsexplorer.NewHandler(modules.MetricsExplorer),
|
||||
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
|
||||
Global: signozglobal.NewHandler(global),
|
||||
FlaggerHandler: flagger.NewHandler(flaggerService),
|
||||
GatewayHandler: gateway.NewHandler(gatewayService),
|
||||
Fields: implfields.NewHandler(providerSettings, telemetryMetadataStore),
|
||||
AuthzHandler: signozauthzapi.NewHandler(authz),
|
||||
ZeusHandler: zeus.NewHandler(zeusService, licensing),
|
||||
QuerierHandler: querierHandler,
|
||||
ServiceAccountHandler: implserviceaccount.NewHandler(modules.ServiceAccount),
|
||||
RegistryHandler: registryHandler,
|
||||
CloudIntegrationHandler: implcloudintegration.NewHandler(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,12 +17,14 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/fields"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
"github.com/SigNoz/signoz/pkg/modules/promote"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
@@ -59,10 +61,12 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ gateway.Handler }{},
|
||||
struct{ fields.Handler }{},
|
||||
struct{ authz.Handler }{},
|
||||
struct{ rawdataexport.Handler }{},
|
||||
struct{ zeus.Handler }{},
|
||||
struct{ querier.Handler }{},
|
||||
struct{ serviceaccount.Handler }{},
|
||||
struct{ factory.Handler }{},
|
||||
struct{ cloudintegration.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -274,10 +274,12 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.GatewayHandler,
|
||||
handlers.Fields,
|
||||
handlers.AuthzHandler,
|
||||
handlers.RawDataExport,
|
||||
handlers.ZeusHandler,
|
||||
handlers.QuerierHandler,
|
||||
handlers.ServiceAccountHandler,
|
||||
handlers.RegistryHandler,
|
||||
handlers.CloudIntegrationHandler,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -111,23 +111,6 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed
|
||||
if query.Filter != nil && query.Filter.Expression != "" && b.telemetryStore != nil {
|
||||
traceIDs, found := ExtractTraceIDsFromFilter(query.Filter.Expression)
|
||||
if found && len(traceIDs) > 0 {
|
||||
finder := NewTraceTimeRangeFinder(b.telemetryStore)
|
||||
|
||||
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
|
||||
if !ok {
|
||||
b.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
|
||||
} else if traceStart > 0 && traceEnd > 0 {
|
||||
start = uint64(traceStart)
|
||||
end = uint64(traceEnd)
|
||||
b.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", start), slog.Uint64("end", end))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create SQL builder
|
||||
q := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
|
||||
@@ -10,34 +10,35 @@ import (
|
||||
type Account struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Provider CloudProviderType `json:"provider"`
|
||||
RemovedAt *time.Time `json:"removedAt,omitempty"`
|
||||
AgentReport *AgentReport `json:"agentReport,omitempty"`
|
||||
OrgID valuer.UUID `json:"orgID"`
|
||||
Config *AccountConfig `json:"config,omitempty"`
|
||||
ProviderAccountID *string `json:"providerAccountId" required:"true" nullable:"true"`
|
||||
Provider CloudProviderType `json:"provider" required:"true"`
|
||||
RemovedAt *time.Time `json:"removedAt" required:"true" nullable:"true"`
|
||||
AgentReport *AgentReport `json:"agentReport" required:"true" nullable:"true"`
|
||||
OrgID valuer.UUID `json:"orgId" required:"true"`
|
||||
Config *AccountConfig `json:"config" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
// AgentReport represents heartbeats sent by the agent.
|
||||
type AgentReport struct {
|
||||
TimestampMillis int64 `json:"timestampMillis"`
|
||||
Data map[string]any `json:"data"`
|
||||
TimestampMillis int64 `json:"timestampMillis" required:"true"`
|
||||
Data map[string]any `json:"data" required:"true" nullable:"true"`
|
||||
}
|
||||
|
||||
type AccountConfig struct {
|
||||
// required till new providers are added
|
||||
AWS *AWSAccountConfig `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type GettableAccounts struct {
|
||||
Accounts []*Account `json:"accounts"`
|
||||
Accounts []*Account `json:"accounts" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type GettableAccount = Account
|
||||
|
||||
type UpdatableAccount struct {
|
||||
Config *AccountConfig `json:"config"`
|
||||
}
|
||||
|
||||
type AccountConfig struct {
|
||||
AWS *AWSAccountConfig `json:"aws,omitempty"`
|
||||
Config *AccountConfig `json:"config" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type AWSAccountConfig struct {
|
||||
Regions []string `json:"regions"`
|
||||
Regions []string `json:"regions" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
@@ -1,88 +1,81 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type ConnectionArtifactRequest struct {
|
||||
Aws *AWSConnectionArtifactRequest `json:"aws"`
|
||||
// required till new providers are added
|
||||
Aws *AWSConnectionArtifactRequest `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifactRequest struct {
|
||||
DeploymentRegion string `json:"deploymentRegion"`
|
||||
Regions []string `json:"regions"`
|
||||
DeploymentRegion string `json:"deploymentRegion" required:"true"`
|
||||
Regions []string `json:"regions" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type PostableConnectionArtifact = ConnectionArtifactRequest
|
||||
|
||||
type ConnectionArtifact struct {
|
||||
Aws *AWSConnectionArtifact `json:"aws"`
|
||||
// required till new providers are added
|
||||
Aws *AWSConnectionArtifact `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifact struct {
|
||||
ConnectionUrl string `json:"connectionURL"`
|
||||
ConnectionURL string `json:"connectionURL" required:"true"`
|
||||
}
|
||||
|
||||
type GettableConnectionArtifact = ConnectionArtifact
|
||||
|
||||
type AccountStatus struct {
|
||||
Id string `json:"id"`
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Status integrationtypes.AccountStatus `json:"status"`
|
||||
type GettableAccountWithArtifact struct {
|
||||
ID valuer.UUID `json:"id" required:"true"`
|
||||
Artifact *ConnectionArtifact `json:"connectionArtifact" required:"true"`
|
||||
}
|
||||
|
||||
type GettableAccountStatus = AccountStatus
|
||||
|
||||
type AgentCheckInRequest struct {
|
||||
// older backward compatible fields are mapped to new fields
|
||||
// CloudIntegrationId string `json:"cloudIntegrationId"`
|
||||
// AccountId string `json:"accountId"`
|
||||
ProviderAccountID string `json:"providerAccountId" required:"false"`
|
||||
CloudIntegrationID string `json:"cloudIntegrationId" required:"false"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
Data map[string]any `json:"data" required:"true" nullable:"true"`
|
||||
}
|
||||
|
||||
type PostableAgentCheckInRequest struct {
|
||||
AgentCheckInRequest
|
||||
// following are backward compatible fields for older running agents
|
||||
// which gets mapped to new fields in AgentCheckInRequest
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
CloudAccountId string `json:"cloud_account_id"`
|
||||
}
|
||||
|
||||
type GettableAgentCheckInResponse struct {
|
||||
AgentCheckInResponse
|
||||
|
||||
// For backward compatibility
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
AccountId string `json:"account_id"`
|
||||
ID string `json:"account_id" required:"false"` // => CloudIntegrationID
|
||||
AccountID string `json:"cloud_account_id" required:"false"` // => ProviderAccountID
|
||||
}
|
||||
|
||||
type AgentCheckInResponse struct {
|
||||
// Older fields for backward compatibility are mapped to new fields below
|
||||
// CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
// AccountId string `json:"account_id"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
// IntegrationConfig populates data related to integration that is required for an agent
|
||||
// to start collecting telemetry data
|
||||
// keeping JSON key snake_case for backward compatibility
|
||||
IntegrationConfig *IntegrationConfig `json:"integration_config,omitempty"`
|
||||
CloudIntegrationID string `json:"cloudIntegrationId" required:"true"`
|
||||
ProviderAccountID string `json:"providerAccountId" required:"true"`
|
||||
IntegrationConfig *ProviderIntegrationConfig `json:"integrationConfig" required:"true"`
|
||||
RemovedAt *time.Time `json:"removedAt" required:"true" nullable:"true"`
|
||||
}
|
||||
|
||||
type IntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"` // backward compatible
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"` // backward compatible
|
||||
type GettableAgentCheckInResponse struct {
|
||||
// Older fields for backward compatibility with existing AWS agents
|
||||
AccountID string `json:"account_id" required:"true"`
|
||||
CloudAccountID string `json:"cloud_account_id" required:"true"`
|
||||
OlderIntegrationConfig *IntegrationConfig `json:"integration_config" required:"true" nullable:"true"`
|
||||
OlderRemovedAt *time.Time `json:"removed_at" required:"true" nullable:"true"`
|
||||
|
||||
// new fields
|
||||
AWS *AWSIntegrationConfig `json:"aws,omitempty"`
|
||||
AgentCheckInResponse
|
||||
}
|
||||
|
||||
// IntegrationConfig older integration config struct for backward compatibility,
|
||||
// this will be eventually removed once agents are updated to use new struct.
|
||||
type IntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabled_regions" required:"true" nullable:"false"` // backward compatible
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry" required:"true" nullable:"false"` // backward compatible
|
||||
}
|
||||
|
||||
type ProviderIntegrationConfig struct {
|
||||
AWS *AWSIntegrationConfig `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type AWSIntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"`
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"`
|
||||
EnabledRegions []string `json:"enabledRegions" required:"true" nullable:"false"`
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
@@ -10,20 +10,19 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var (
|
||||
S3Sync = valuer.NewString("s3sync")
|
||||
// ErrCodeInvalidServiceID is the error code for invalid service id.
|
||||
ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
|
||||
)
|
||||
|
||||
type ServiceID struct{ valuer.String }
|
||||
var ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
|
||||
|
||||
type CloudIntegrationService struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
Type ServiceID `json:"type"`
|
||||
Config *ServiceConfig `json:"config"`
|
||||
CloudIntegrationID valuer.UUID `json:"cloudIntegrationID"`
|
||||
CloudIntegrationID valuer.UUID `json:"cloudIntegrationId"`
|
||||
}
|
||||
|
||||
type ServiceConfig struct {
|
||||
// required till new providers are added
|
||||
AWS *AWSServiceConfig `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
// ServiceMetadata helps to quickly list available services and whether it is enabled or not.
|
||||
@@ -32,26 +31,56 @@ type CloudIntegrationService struct {
|
||||
type ServiceMetadata struct {
|
||||
ServiceDefinitionMetadata
|
||||
// if the service is enabled for the account
|
||||
Enabled bool `json:"enabled"`
|
||||
Enabled bool `json:"enabled" required:"true"`
|
||||
}
|
||||
|
||||
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
|
||||
type ServiceDefinitionMetadata struct {
|
||||
ID string `json:"id" required:"true"`
|
||||
Title string `json:"title" required:"true"`
|
||||
Icon string `json:"icon" required:"true"`
|
||||
}
|
||||
|
||||
type GettableServicesMetadata struct {
|
||||
Services []*ServiceMetadata `json:"services"`
|
||||
Services []*ServiceMetadata `json:"services" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
ServiceDefinition
|
||||
ServiceConfig *ServiceConfig `json:"serviceConfig"`
|
||||
ServiceConfig *ServiceConfig `json:"serviceConfig" required:"false" nullable:"false"`
|
||||
}
|
||||
|
||||
type GettableService = Service
|
||||
|
||||
type UpdatableService struct {
|
||||
Config *ServiceConfig `json:"config"`
|
||||
Config *ServiceConfig `json:"config" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type ServiceConfig struct {
|
||||
AWS *AWSServiceConfig `json:"aws,omitempty"`
|
||||
type ServiceDefinition struct {
|
||||
ServiceDefinitionMetadata
|
||||
Overview string `json:"overview" required:"true"` // markdown
|
||||
Assets Assets `json:"assets" required:"true"`
|
||||
SupportedSignals SupportedSignals `json:"supported_signals" required:"true"`
|
||||
DataCollected DataCollected `json:"dataCollected" required:"true"`
|
||||
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
// SupportedSignals for cloud provider's service.
|
||||
type SupportedSignals struct {
|
||||
Logs bool `json:"logs"`
|
||||
Metrics bool `json:"metrics"`
|
||||
}
|
||||
|
||||
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
|
||||
type DataCollected struct {
|
||||
Logs []CollectedLogAttribute `json:"logs"`
|
||||
Metrics []CollectedMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
// CollectionStrategy is cloud provider specific configuration for signal collection,
|
||||
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
|
||||
type CollectionStrategy struct {
|
||||
AWS *AWSCollectionStrategy `json:"aws" required:"true" nullable:"false"`
|
||||
}
|
||||
|
||||
type AWSServiceConfig struct {
|
||||
@@ -70,45 +99,11 @@ type AWSServiceMetricsConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
|
||||
type ServiceDefinitionMetadata struct {
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Icon string `json:"icon"`
|
||||
}
|
||||
|
||||
type ServiceDefinition struct {
|
||||
ServiceDefinitionMetadata
|
||||
Overview string `json:"overview"` // markdown
|
||||
Assets Assets `json:"assets"`
|
||||
SupportedSignals SupportedSignals `json:"supported_signals"`
|
||||
DataCollected DataCollected `json:"dataCollected"`
|
||||
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy"`
|
||||
}
|
||||
|
||||
// CollectionStrategy is cloud provider specific configuration for signal collection,
|
||||
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
|
||||
type CollectionStrategy struct {
|
||||
AWS *AWSCollectionStrategy `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
// Assets represents the collection of dashboards.
|
||||
type Assets struct {
|
||||
Dashboards []Dashboard `json:"dashboards"`
|
||||
}
|
||||
|
||||
// SupportedSignals for cloud provider's service.
|
||||
type SupportedSignals struct {
|
||||
Logs bool `json:"logs"`
|
||||
Metrics bool `json:"metrics"`
|
||||
}
|
||||
|
||||
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
|
||||
type DataCollected struct {
|
||||
Logs []CollectedLogAttribute `json:"logs"`
|
||||
Metrics []CollectedMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
// CollectedLogAttribute represents a log attribute that is present in all log entries for a service,
|
||||
// this is shown as part of service overview.
|
||||
type CollectedLogAttribute struct {
|
||||
@@ -169,56 +164,23 @@ type AWSLogsStrategy struct {
|
||||
// This is used to show available pre-made dashboards for a service,
|
||||
// hence has additional fields like id, title and description
|
||||
type Dashboard struct {
|
||||
Id string `json:"id"`
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
Definition dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
|
||||
}
|
||||
|
||||
// SupportedServices is the map of supported services for each cloud provider.
|
||||
var SupportedServices = map[CloudProviderType][]ServiceID{
|
||||
CloudProviderTypeAWS: {
|
||||
{valuer.NewString("alb")},
|
||||
{valuer.NewString("api-gateway")},
|
||||
{valuer.NewString("dynamodb")},
|
||||
{valuer.NewString("ec2")},
|
||||
{valuer.NewString("ecs")},
|
||||
{valuer.NewString("eks")},
|
||||
{valuer.NewString("elasticache")},
|
||||
{valuer.NewString("lambda")},
|
||||
{valuer.NewString("msk")},
|
||||
{valuer.NewString("rds")},
|
||||
{valuer.NewString("s3sync")},
|
||||
{valuer.NewString("sns")},
|
||||
{valuer.NewString("sqs")},
|
||||
},
|
||||
}
|
||||
|
||||
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
|
||||
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
|
||||
services, ok := SupportedServices[provider]
|
||||
if !ok {
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
|
||||
}
|
||||
for _, s := range services {
|
||||
if s.StringValue() == service {
|
||||
return s, nil
|
||||
}
|
||||
}
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
|
||||
}
|
||||
|
||||
// UTILS
|
||||
|
||||
// GetCloudIntegrationDashboardID returns the dashboard id for a cloud integration, given the cloud provider, service id, and dashboard id.
|
||||
// This is used to generate unique dashboard ids for cloud integration, and also to parse the dashboard id to get the cloud provider and service id when needed.
|
||||
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcId, dashboardId string) string {
|
||||
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
|
||||
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcID, dashboardID string) string {
|
||||
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcID, dashboardID)
|
||||
}
|
||||
|
||||
// GetDashboardsFromAssets returns the list of dashboards for the cloud provider service from definition.
|
||||
func GetDashboardsFromAssets(
|
||||
svcId string,
|
||||
svcID string,
|
||||
orgID valuer.UUID,
|
||||
cloudProvider CloudProviderType,
|
||||
createdAt time.Time,
|
||||
@@ -229,7 +191,7 @@ func GetDashboardsFromAssets(
|
||||
for _, d := range assets.Dashboards {
|
||||
author := fmt.Sprintf("%s-integration", cloudProvider)
|
||||
dashboards = append(dashboards, &dashboardtypes.Dashboard{
|
||||
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
|
||||
ID: GetCloudIntegrationDashboardID(cloudProvider, svcID, d.ID),
|
||||
Locked: true,
|
||||
OrgID: orgID,
|
||||
Data: d.Definition,
|
||||
|
||||
75
pkg/types/cloudintegrationtypes/serviceid.go
Normal file
75
pkg/types/cloudintegrationtypes/serviceid.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type ServiceID struct{ valuer.String }
|
||||
|
||||
var (
|
||||
AWSServiceALB = ServiceID{valuer.NewString("alb")}
|
||||
AWSServiceAPIGateway = ServiceID{valuer.NewString("api-gateway")}
|
||||
AWSServiceDynamoDB = ServiceID{valuer.NewString("dynamodb")}
|
||||
AWSServiceEC2 = ServiceID{valuer.NewString("ec2")}
|
||||
AWSServiceECS = ServiceID{valuer.NewString("ecs")}
|
||||
AWSServiceEKS = ServiceID{valuer.NewString("eks")}
|
||||
AWSServiceElastiCache = ServiceID{valuer.NewString("elasticache")}
|
||||
AWSServiceLambda = ServiceID{valuer.NewString("lambda")}
|
||||
AWSServiceMSK = ServiceID{valuer.NewString("msk")}
|
||||
AWSServiceRDS = ServiceID{valuer.NewString("rds")}
|
||||
AWSServiceS3Sync = ServiceID{valuer.NewString("s3sync")}
|
||||
AWSServiceSNS = ServiceID{valuer.NewString("sns")}
|
||||
AWSServiceSQS = ServiceID{valuer.NewString("sqs")}
|
||||
)
|
||||
|
||||
func (ServiceID) Enum() []any {
|
||||
return []any{
|
||||
AWSServiceALB,
|
||||
AWSServiceAPIGateway,
|
||||
AWSServiceDynamoDB,
|
||||
AWSServiceEC2,
|
||||
AWSServiceECS,
|
||||
AWSServiceEKS,
|
||||
AWSServiceElastiCache,
|
||||
AWSServiceLambda,
|
||||
AWSServiceMSK,
|
||||
AWSServiceRDS,
|
||||
AWSServiceS3Sync,
|
||||
AWSServiceSNS,
|
||||
AWSServiceSQS,
|
||||
}
|
||||
}
|
||||
|
||||
// SupportedServices is the map of supported services for each cloud provider.
|
||||
var SupportedServices = map[CloudProviderType][]ServiceID{
|
||||
CloudProviderTypeAWS: {
|
||||
AWSServiceALB,
|
||||
AWSServiceAPIGateway,
|
||||
AWSServiceDynamoDB,
|
||||
AWSServiceEC2,
|
||||
AWSServiceECS,
|
||||
AWSServiceEKS,
|
||||
AWSServiceElastiCache,
|
||||
AWSServiceLambda,
|
||||
AWSServiceMSK,
|
||||
AWSServiceRDS,
|
||||
AWSServiceS3Sync,
|
||||
AWSServiceSNS,
|
||||
AWSServiceSQS,
|
||||
},
|
||||
}
|
||||
|
||||
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
|
||||
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
|
||||
services, ok := SupportedServices[provider]
|
||||
if !ok {
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
|
||||
}
|
||||
for _, s := range services {
|
||||
if s.StringValue() == service {
|
||||
return s, nil
|
||||
}
|
||||
}
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
|
||||
}
|
||||
6
pkg/types/exporttypes/rawdataexport.go
Normal file
6
pkg/types/exporttypes/rawdataexport.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package exporttypes
|
||||
|
||||
type ExportRawDataFormatQueryParam struct {
|
||||
// Format specifies the output format: "csv" or "jsonl"
|
||||
Format string `query:"format,default=csv" default:"csv" enum:"csv,jsonl" description:"The output format for the export."`
|
||||
}
|
||||
@@ -393,6 +393,77 @@ func (r *QueryRangeRequest) HasOrderSpecified() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// UseDefaultOrderBy applies UseDefaultOrderByForListQuery to every query in the
|
||||
// composite query when the request type is a list query (raw, raw_stream, trace).
|
||||
func (r *QueryRangeRequest) UseDefaultOrderBy() {
|
||||
|
||||
// Based on the request type, handle default order-bys
|
||||
switch r.RequestType {
|
||||
case RequestTypeRaw, RequestTypeRawStream, RequestTypeTrace:
|
||||
for idx := range r.CompositeQuery.Queries {
|
||||
r.CompositeQuery.Queries[idx].UseDefaultOrderByForListQuery()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// UseDefaultOrderByForListQuery applies a default timestamp-descending order
|
||||
// for list/raw queries when no explicit order is specified. This is intended
|
||||
// for raw data listing endpoints (e.g. export, list views) where a sensible
|
||||
// default sort is needed, not for aggregation or timeseries queries.
|
||||
func (q *QueryEnvelope) UseDefaultOrderByForListQuery() {
|
||||
if len(q.GetOrder()) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
switch q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation],
|
||||
QueryBuilderTraceOperator:
|
||||
q.SetOrder(
|
||||
[]OrderBy{
|
||||
{
|
||||
Key: OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
Direction: OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
)
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
q.SetOrder(
|
||||
[]OrderBy{
|
||||
{
|
||||
Key: OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
Direction: OrderDirectionDesc,
|
||||
},
|
||||
{
|
||||
Key: OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "id",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
Direction: OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *QueryRangeRequest) FuncsForQuery(name string) []Function {
|
||||
funcs := []Function{}
|
||||
for _, query := range r.CompositeQuery.Queries {
|
||||
@@ -437,6 +508,16 @@ func (r *QueryRangeRequest) IsAnomalyRequest() (*QueryBuilderQuery[MetricAggrega
|
||||
return &q, hasAnomaly
|
||||
}
|
||||
|
||||
func (r *QueryRangeRequest) TraceOperatorQueryIndex() int {
|
||||
for idx, query := range r.CompositeQuery.Queries {
|
||||
switch query.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return idx
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// We do not support fill gaps for these queries. Maybe support in future?
|
||||
func (r *QueryRangeRequest) SkipFillGaps(name string) bool {
|
||||
for _, query := range r.CompositeQuery.Queries {
|
||||
|
||||
379
pkg/types/querybuildertypes/querybuildertypesv5/req_getters.go
Normal file
379
pkg/types/querybuildertypes/querybuildertypesv5/req_getters.go
Normal file
@@ -0,0 +1,379 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
|
||||
// GetExpression returns the expression string.
|
||||
func (q *QueryEnvelope) GetExpression() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Expression
|
||||
case QueryBuilderFormula:
|
||||
return spec.Expression
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetReturnSpansFrom returns the return-spans-from value.
|
||||
func (q *QueryEnvelope) GetReturnSpansFrom() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.ReturnSpansFrom
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetSignal returns the signal.
|
||||
func (q *QueryEnvelope) GetSignal() telemetrytypes.Signal {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Signal
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Signal
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Signal
|
||||
}
|
||||
return telemetrytypes.SignalUnspecified
|
||||
}
|
||||
|
||||
// GetSource returns the source.
|
||||
func (q *QueryEnvelope) GetSource() telemetrytypes.Source {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Source
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Source
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Source
|
||||
}
|
||||
return telemetrytypes.SourceUnspecified
|
||||
}
|
||||
|
||||
// GetQuery returns the raw query string.
|
||||
func (q *QueryEnvelope) GetQuery() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case PromQuery:
|
||||
return spec.Query
|
||||
case ClickHouseQuery:
|
||||
return spec.Query
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetStats returns the PromQL stats flag.
|
||||
func (q *QueryEnvelope) GetStats() bool {
|
||||
switch spec := q.Spec.(type) {
|
||||
case PromQuery:
|
||||
return spec.Stats
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetLeft returns the left query reference of a join.
|
||||
func (q *QueryEnvelope) GetLeft() QueryRef {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
return spec.Left
|
||||
}
|
||||
return QueryRef{}
|
||||
}
|
||||
|
||||
// GetRight returns the right query reference of a join.
|
||||
func (q *QueryEnvelope) GetRight() QueryRef {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
return spec.Right
|
||||
}
|
||||
return QueryRef{}
|
||||
}
|
||||
|
||||
// GetJoinType returns the join type.
|
||||
func (q *QueryEnvelope) GetJoinType() JoinType {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
return spec.Type
|
||||
}
|
||||
return JoinType{}
|
||||
}
|
||||
|
||||
// GetOn returns the join ON condition.
|
||||
func (q *QueryEnvelope) GetOn() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
return spec.On
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetQueryName returns the name of the spec.
|
||||
func (q *QueryEnvelope) GetQueryName() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Name
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderFormula:
|
||||
return spec.Name
|
||||
case QueryBuilderJoin:
|
||||
return spec.Name
|
||||
case PromQuery:
|
||||
return spec.Name
|
||||
case ClickHouseQuery:
|
||||
return spec.Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// IsDisabled returns whether the spec is disabled.
|
||||
func (q *QueryEnvelope) IsDisabled() bool {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Disabled
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderFormula:
|
||||
return spec.Disabled
|
||||
case QueryBuilderJoin:
|
||||
return spec.Disabled
|
||||
case PromQuery:
|
||||
return spec.Disabled
|
||||
case ClickHouseQuery:
|
||||
return spec.Disabled
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetLimit returns the row limit.
|
||||
func (q *QueryEnvelope) GetLimit() int {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Limit
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Limit
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Limit
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Limit
|
||||
case QueryBuilderFormula:
|
||||
return spec.Limit
|
||||
case QueryBuilderJoin:
|
||||
return spec.Limit
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetOffset returns the row offset.
|
||||
func (q *QueryEnvelope) GetOffset() int {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Offset
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Offset
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Offset
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Offset
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetType returns the QueryType of the envelope.
|
||||
func (q *QueryEnvelope) GetType() QueryType {
|
||||
return q.Type
|
||||
}
|
||||
|
||||
// GetOrder returns the order-by clauses.
|
||||
func (q *QueryEnvelope) GetOrder() []OrderBy {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Order
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Order
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Order
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Order
|
||||
case QueryBuilderFormula:
|
||||
return spec.Order
|
||||
case QueryBuilderJoin:
|
||||
return spec.Order
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGroupBy returns the group-by keys.
|
||||
func (q *QueryEnvelope) GetGroupBy() []GroupByKey {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.GroupBy
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.GroupBy
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.GroupBy
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.GroupBy
|
||||
case QueryBuilderJoin:
|
||||
return spec.GroupBy
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetFilter returns the filter.
|
||||
func (q *QueryEnvelope) GetFilter() *Filter {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Filter
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Filter
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Filter
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Filter
|
||||
case QueryBuilderJoin:
|
||||
return spec.Filter
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetHaving returns the having clause.
|
||||
func (q *QueryEnvelope) GetHaving() *Having {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Having
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Having
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Having
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Having
|
||||
case QueryBuilderFormula:
|
||||
return spec.Having
|
||||
case QueryBuilderJoin:
|
||||
return spec.Having
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetFunctions returns the post-processing functions.
|
||||
func (q *QueryEnvelope) GetFunctions() []Function {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Functions
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Functions
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Functions
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Functions
|
||||
case QueryBuilderFormula:
|
||||
return spec.Functions
|
||||
case QueryBuilderJoin:
|
||||
return spec.Functions
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSelectFields returns the selected fields.
|
||||
func (q *QueryEnvelope) GetSelectFields() []telemetrytypes.TelemetryFieldKey {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.SelectFields
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.SelectFields
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.SelectFields
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.SelectFields
|
||||
case QueryBuilderJoin:
|
||||
return spec.SelectFields
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLegend returns the legend label.
|
||||
func (q *QueryEnvelope) GetLegend() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Legend
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Legend
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Legend
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Legend
|
||||
case QueryBuilderFormula:
|
||||
return spec.Legend
|
||||
case PromQuery:
|
||||
return spec.Legend
|
||||
case ClickHouseQuery:
|
||||
return spec.Legend
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetCursor returns the pagination cursor.
|
||||
func (q *QueryEnvelope) GetCursor() string {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Cursor
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Cursor
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Cursor
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Cursor
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetStepInterval returns the step interval.
|
||||
func (q *QueryEnvelope) GetStepInterval() Step {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.StepInterval
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.StepInterval
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.StepInterval
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.StepInterval
|
||||
case PromQuery:
|
||||
return spec.Step
|
||||
}
|
||||
return Step{}
|
||||
}
|
||||
|
||||
// GetSecondaryAggregations returns the secondary aggregations.
|
||||
func (q *QueryEnvelope) GetSecondaryAggregations() []SecondaryAggregation {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.SecondaryAggregations
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.SecondaryAggregations
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.SecondaryAggregations
|
||||
case QueryBuilderJoin:
|
||||
return spec.SecondaryAggregations
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLimitBy returns the limit-by configuration.
|
||||
func (q *QueryEnvelope) GetLimitBy() *LimitBy {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.LimitBy
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.LimitBy
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.LimitBy
|
||||
}
|
||||
return nil
|
||||
}
|
||||
452
pkg/types/querybuildertypes/querybuildertypesv5/req_setters.go
Normal file
452
pkg/types/querybuildertypes/querybuildertypesv5/req_setters.go
Normal file
@@ -0,0 +1,452 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
|
||||
// SetExpression sets the expression string of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetExpression(expression string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Expression = expression
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Expression = expression
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetReturnSpansFrom sets the return-spans-from value, if applicable.
|
||||
func (q *QueryEnvelope) SetReturnSpansFrom(returnSpansFrom string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.ReturnSpansFrom = returnSpansFrom
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetSignal sets the signal of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetSignal(signal telemetrytypes.Signal) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Signal = signal
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Signal = signal
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Signal = signal
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetSource sets the source of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetSource(source telemetrytypes.Source) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Source = source
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Source = source
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Source = source
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetQuery sets the raw query string of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetQuery(query string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case PromQuery:
|
||||
spec.Query = query
|
||||
q.Spec = spec
|
||||
case ClickHouseQuery:
|
||||
spec.Query = query
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetStats sets the PromQL stats flag, if applicable.
|
||||
func (q *QueryEnvelope) SetStats(stats bool) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case PromQuery:
|
||||
spec.Stats = stats
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetLeft sets the left query reference of a join, if applicable.
|
||||
func (q *QueryEnvelope) SetLeft(left QueryRef) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
spec.Left = left
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetRight sets the right query reference of a join, if applicable.
|
||||
func (q *QueryEnvelope) SetRight(right QueryRef) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
spec.Right = right
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetJoinType sets the join type, if applicable.
|
||||
func (q *QueryEnvelope) SetJoinType(joinType JoinType) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
spec.Type = joinType
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetOn sets the join ON condition, if applicable.
|
||||
func (q *QueryEnvelope) SetOn(on string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderJoin:
|
||||
spec.On = on
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetQueryName sets the name of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetQueryName(name string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case PromQuery:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
case ClickHouseQuery:
|
||||
spec.Name = name
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetDisabled sets the disabled flag of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetDisabled(disabled bool) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case PromQuery:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
case ClickHouseQuery:
|
||||
spec.Disabled = disabled
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetLimit sets the row limit of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetLimit(limit int) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Limit = limit
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetOffset sets the row offset of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetOffset(offset int) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Offset = offset
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Offset = offset
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Offset = offset
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Offset = offset
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetType sets the QueryType of the envelope.
|
||||
func (q *QueryEnvelope) SetType(t QueryType) {
|
||||
q.Type = t
|
||||
}
|
||||
|
||||
// SetOrder sets the order-by clauses of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetOrder(order []OrderBy) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Order = order
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetGroupBy sets the group-by keys of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetGroupBy(groupBy []GroupByKey) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.GroupBy = groupBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.GroupBy = groupBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.GroupBy = groupBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.GroupBy = groupBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.GroupBy = groupBy
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetFilter sets the filter of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetFilter(filter *Filter) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Filter = filter
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Filter = filter
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Filter = filter
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Filter = filter
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Filter = filter
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetHaving sets the having clause of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetHaving(having *Having) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Having = having
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetFunctions sets the post-processing functions of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetFunctions(functions []Function) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.Functions = functions
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetSelectFields sets the selected fields of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetSelectFields(fields []telemetrytypes.TelemetryFieldKey) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.SelectFields = fields
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.SelectFields = fields
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.SelectFields = fields
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.SelectFields = fields
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.SelectFields = fields
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetLegend sets the legend label of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetLegend(legend string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case QueryBuilderFormula:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case PromQuery:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
case ClickHouseQuery:
|
||||
spec.Legend = legend
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetCursor sets the pagination cursor of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetCursor(cursor string) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.Cursor = cursor
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.Cursor = cursor
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.Cursor = cursor
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.Cursor = cursor
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetStepInterval sets the step interval of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetStepInterval(step Step) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderTraceOperator:
|
||||
spec.StepInterval = step
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.StepInterval = step
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.StepInterval = step
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.StepInterval = step
|
||||
q.Spec = spec
|
||||
case PromQuery:
|
||||
spec.Step = step
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetSecondaryAggregations sets the secondary aggregations of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetSecondaryAggregations(secondaryAggregations []SecondaryAggregation) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.SecondaryAggregations = secondaryAggregations
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.SecondaryAggregations = secondaryAggregations
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.SecondaryAggregations = secondaryAggregations
|
||||
q.Spec = spec
|
||||
case QueryBuilderJoin:
|
||||
spec.SecondaryAggregations = secondaryAggregations
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetLimitBy sets the limit-by configuration of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetLimitBy(limitBy *LimitBy) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.LimitBy = limitBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.LimitBy = limitBy
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.LimitBy = limitBy
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
@@ -10,55 +10,9 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
// queryName returns the name from any query envelope spec type.
|
||||
func (e QueryEnvelope) queryName() string {
|
||||
switch spec := e.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Name
|
||||
case QueryBuilderFormula:
|
||||
return spec.Name
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Name
|
||||
case QueryBuilderJoin:
|
||||
return spec.Name
|
||||
case PromQuery:
|
||||
return spec.Name
|
||||
case ClickHouseQuery:
|
||||
return spec.Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// isDisabled returns the disabled status from any query envelope spec type.
|
||||
func (e QueryEnvelope) isDisabled() bool {
|
||||
switch spec := e.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Disabled
|
||||
case QueryBuilderFormula:
|
||||
return spec.Disabled
|
||||
case QueryBuilderTraceOperator:
|
||||
return spec.Disabled
|
||||
case QueryBuilderJoin:
|
||||
return spec.Disabled
|
||||
case PromQuery:
|
||||
return spec.Disabled
|
||||
case ClickHouseQuery:
|
||||
return spec.Disabled
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getQueryIdentifier returns a friendly identifier for a query based on its type and name/content
|
||||
func getQueryIdentifier(envelope QueryEnvelope, index int) string {
|
||||
name := envelope.queryName()
|
||||
name := envelope.GetQueryName()
|
||||
|
||||
var typeLabel string
|
||||
switch envelope.Type {
|
||||
@@ -89,50 +43,115 @@ const (
|
||||
MaxQueryLimit = 10000
|
||||
)
|
||||
|
||||
// Validate performs preliminary validation on QueryBuilderQuery
|
||||
func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
|
||||
// Validate signal
|
||||
// ValidationOption is a functional option for configuring validation behaviour.
|
||||
type ValidationOption func(*validationConfig)
|
||||
|
||||
type validationConfig struct {
|
||||
skipLimitOffsetValidation bool
|
||||
skipAggregationValidation bool
|
||||
skipHavingValidation bool
|
||||
skipAggregationOrderBy bool
|
||||
skipSelectFieldValidation bool
|
||||
skipGroupByValidation bool
|
||||
}
|
||||
|
||||
func applyValidationOptions(opts []ValidationOption) validationConfig {
|
||||
cfg := validationConfig{}
|
||||
for _, opt := range opts {
|
||||
opt(&cfg)
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
// SkipLimitOffsetValidation returns a ValidationOption that skips the limit and offset range checks.
|
||||
// Use this when the caller has already validated limits and offsets with different constraints.
|
||||
func WithSkipLimitOffsetValidation() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipLimitOffsetValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
// SkipAggregationValidation skips aggregation validation.
|
||||
// Used for raw/trace request types where aggregations are not required.
|
||||
func WithSkipAggregationValidation() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipAggregationValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
// SkipHavingValidation skips having-clause validation.
|
||||
// Used for raw/trace request types where having clauses do not apply.
|
||||
func WithSkipHavingValidation() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipHavingValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
// SkipAggregationOrderBy skips the aggregation-specific order-by key validation.
|
||||
// Used for raw/trace request types where order-by keys are not restricted to group-by or aggregation keys.
|
||||
func WithSkipAggregationOrderBy() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipAggregationOrderBy = true
|
||||
}
|
||||
}
|
||||
|
||||
// SkipSelectFieldValidation skips select-field validation.
|
||||
// Used for aggregation request types where select fields do not apply.
|
||||
func WithSkipSelectFieldValidation() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipSelectFieldValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
// SkipGroupByValidation skips group-by validation.
|
||||
// Used for raw/trace request types where group-by does not apply.
|
||||
func WithSkipGroupByValidation() ValidationOption {
|
||||
return func(cfg *validationConfig) {
|
||||
cfg.skipGroupByValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
// Validate performs preliminary validation on QueryBuilderQuery.
|
||||
func (q *QueryBuilderQuery[T]) Validate(opts ...ValidationOption) error {
|
||||
cfg := applyValidationOptions(opts)
|
||||
|
||||
if err := q.validateSignal(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := q.validateAggregations(requestType); err != nil {
|
||||
if err := q.validateAggregations(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := q.validateGroupBy(requestType); err != nil {
|
||||
if err := q.validateGroupBy(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate limit and pagination
|
||||
if err := q.validateLimitAndPagination(); err != nil {
|
||||
if err := q.validateLimitAndPagination(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate functions
|
||||
if err := q.validateFunctions(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate secondary aggregations
|
||||
if err := q.validateSecondaryAggregations(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := q.validateOrderBy(requestType); err != nil {
|
||||
if err := q.validateOrderBy(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := q.validateSelectFields(requestType); err != nil {
|
||||
if err := q.validateSelectFields(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryBuilderQuery[T]) validateSelectFields(requestType RequestType) error {
|
||||
// selectFields don't apply to aggregation queries, skip validation
|
||||
if requestType.IsAggregation() {
|
||||
func (q *QueryBuilderQuery[T]) validateSelectFields(cfg validationConfig) error {
|
||||
if cfg.skipSelectFieldValidation {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -148,9 +167,8 @@ func (q *QueryBuilderQuery[T]) validateSelectFields(requestType RequestType) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryBuilderQuery[T]) validateGroupBy(requestType RequestType) error {
|
||||
// groupBy doesn't apply to non-aggregation queries, skip validation
|
||||
if !requestType.IsAggregation() {
|
||||
func (q *QueryBuilderQuery[T]) validateGroupBy(cfg validationConfig) error {
|
||||
if cfg.skipGroupByValidation {
|
||||
return nil
|
||||
}
|
||||
for idx, item := range q.GroupBy {
|
||||
@@ -183,9 +201,8 @@ func (q *QueryBuilderQuery[T]) validateSignal() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (q *QueryBuilderQuery[T]) validateAggregations(requestType RequestType) error {
|
||||
// aggregations don't apply to non-aggregation queries, skip validation
|
||||
if !requestType.IsAggregation() {
|
||||
func (q *QueryBuilderQuery[T]) validateAggregations(cfg validationConfig) error {
|
||||
if cfg.skipAggregationValidation {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -272,8 +289,11 @@ func (q *QueryBuilderQuery[T]) validateAggregations(requestType RequestType) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryBuilderQuery[T]) validateLimitAndPagination() error {
|
||||
// Validate limit
|
||||
func (q *QueryBuilderQuery[T]) validateLimitAndPagination(cfg validationConfig) error {
|
||||
if cfg.skipLimitOffsetValidation {
|
||||
return nil
|
||||
}
|
||||
|
||||
if q.Limit < 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
@@ -336,7 +356,7 @@ func (q *QueryBuilderQuery[T]) validateSecondaryAggregations() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryBuilderQuery[T]) validateOrderBy(requestType RequestType) error {
|
||||
func (q *QueryBuilderQuery[T]) validateOrderBy(cfg validationConfig) error {
|
||||
for i, order := range q.Order {
|
||||
// Direction validation is handled by the OrderDirection type
|
||||
if order.Direction != OrderDirectionAsc && order.Direction != OrderDirectionDesc {
|
||||
@@ -355,8 +375,7 @@ func (q *QueryBuilderQuery[T]) validateOrderBy(requestType RequestType) error {
|
||||
}
|
||||
}
|
||||
|
||||
// aggregation-specific order key validation only applies to aggregation queries
|
||||
if requestType.IsAggregation() {
|
||||
if !cfg.skipAggregationOrderBy {
|
||||
return q.validateOrderByForAggregation()
|
||||
}
|
||||
|
||||
@@ -438,8 +457,8 @@ func (q *QueryBuilderQuery[T]) validateOrderByForAggregation() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateQueryRangeRequest validates the entire query range request
|
||||
func (r *QueryRangeRequest) Validate() error {
|
||||
// Validate validates the entire query range request.
|
||||
func (r *QueryRangeRequest) Validate(opts ...ValidationOption) error {
|
||||
// Validate time range
|
||||
if r.RequestType != RequestTypeRawStream && r.Start >= r.End {
|
||||
return errors.NewInvalidInputf(
|
||||
@@ -450,8 +469,8 @@ func (r *QueryRangeRequest) Validate() error {
|
||||
|
||||
// Validate request type
|
||||
switch r.RequestType {
|
||||
case RequestTypeRaw, RequestTypeRawStream, RequestTypeTimeSeries, RequestTypeScalar, RequestTypeTrace:
|
||||
// Valid request types
|
||||
case RequestTypeRaw, RequestTypeRawStream, RequestTypeTrace, RequestTypeTimeSeries, RequestTypeScalar:
|
||||
opts = append(opts, GetValidationOptions(r.RequestType)...)
|
||||
default:
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
@@ -463,7 +482,7 @@ func (r *QueryRangeRequest) Validate() error {
|
||||
}
|
||||
|
||||
// Validate composite query
|
||||
if err := r.validateCompositeQuery(); err != nil {
|
||||
if err := r.CompositeQuery.Validate(opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -478,7 +497,7 @@ func (r *QueryRangeRequest) Validate() error {
|
||||
// validateAllQueriesNotDisabled validates that at least one query in the composite query is enabled
|
||||
func (r *QueryRangeRequest) validateAllQueriesNotDisabled() error {
|
||||
for _, envelope := range r.CompositeQuery.Queries {
|
||||
if !envelope.isDisabled() {
|
||||
if !envelope.IsDisabled() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -489,12 +508,8 @@ func (r *QueryRangeRequest) validateAllQueriesNotDisabled() error {
|
||||
)
|
||||
}
|
||||
|
||||
func (r *QueryRangeRequest) validateCompositeQuery() error {
|
||||
return r.CompositeQuery.Validate(r.RequestType)
|
||||
}
|
||||
|
||||
// Validate performs validation on CompositeQuery
|
||||
func (c *CompositeQuery) Validate(requestType RequestType) error {
|
||||
func (c *CompositeQuery) Validate(opts ...ValidationOption) error {
|
||||
if len(c.Queries) == 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
@@ -506,14 +521,14 @@ func (c *CompositeQuery) Validate(requestType RequestType) error {
|
||||
queryNames := make(map[string]bool)
|
||||
|
||||
for i, envelope := range c.Queries {
|
||||
if err := validateQueryEnvelope(envelope, requestType); err != nil {
|
||||
if err := validateQueryEnvelope(envelope, opts...); err != nil {
|
||||
queryId := getQueryIdentifier(envelope, i)
|
||||
return wrapValidationError(err, queryId, "invalid %s: %s")
|
||||
}
|
||||
|
||||
// Check name uniqueness for builder queries
|
||||
if envelope.Type == QueryTypeBuilder || envelope.Type == QueryTypeSubQuery {
|
||||
name := envelope.queryName()
|
||||
name := envelope.GetQueryName()
|
||||
if name != "" {
|
||||
if queryNames[name] {
|
||||
return errors.NewInvalidInputf(
|
||||
@@ -530,16 +545,16 @@ func (c *CompositeQuery) Validate(requestType RequestType) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) error {
|
||||
func validateQueryEnvelope(envelope QueryEnvelope, opts ...ValidationOption) error {
|
||||
switch envelope.Type {
|
||||
case QueryTypeBuilder, QueryTypeSubQuery:
|
||||
switch spec := envelope.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.Validate(requestType)
|
||||
return spec.Validate(opts...)
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.Validate(requestType)
|
||||
return spec.Validate(opts...)
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.Validate(requestType)
|
||||
return spec.Validate(opts...)
|
||||
default:
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
@@ -625,3 +640,14 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func GetValidationOptions(requestType RequestType) []ValidationOption {
|
||||
switch requestType {
|
||||
case RequestTypeTimeSeries, RequestTypeScalar:
|
||||
return []ValidationOption{WithSkipSelectFieldValidation()}
|
||||
case RequestTypeRaw, RequestTypeRawStream, RequestTypeTrace:
|
||||
return []ValidationOption{WithSkipAggregationValidation(), WithSkipHavingValidation(), WithSkipAggregationOrderBy(), WithSkipGroupByValidation()}
|
||||
default:
|
||||
return []ValidationOption{}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -743,7 +743,7 @@ func TestValidateQueryEnvelope(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := validateQueryEnvelope(tt.envelope, tt.requestType)
|
||||
err := validateQueryEnvelope(tt.envelope)
|
||||
if tt.wantErr {
|
||||
if err == nil {
|
||||
t.Errorf("validateQueryEnvelope() expected error but got none")
|
||||
@@ -816,7 +816,7 @@ func TestQueryEnvelope_Helpers(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.envelope.queryName()
|
||||
got := tt.envelope.GetQueryName()
|
||||
if got != tt.want {
|
||||
t.Errorf("queryName() = %q, want %q", got, tt.want)
|
||||
}
|
||||
@@ -868,7 +868,7 @@ func TestQueryEnvelope_Helpers(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.envelope.isDisabled()
|
||||
got := tt.envelope.IsDisabled()
|
||||
if got != tt.want {
|
||||
t.Errorf("isDisabled() = %v, want %v", got, tt.want)
|
||||
}
|
||||
@@ -1107,7 +1107,7 @@ func TestQueryRangeRequest_ValidateOrderByForAggregation(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.query.Validate(RequestTypeTimeSeries)
|
||||
err := tt.query.Validate(GetValidationOptions(RequestTypeTimeSeries)...)
|
||||
if tt.wantErr {
|
||||
if err == nil {
|
||||
t.Errorf("validateOrderByForAggregation() expected error but got none")
|
||||
@@ -1161,7 +1161,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
}
|
||||
err := query.Validate(RequestTypeRaw)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeRaw)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for groupBy with raw request type, got: %v", err)
|
||||
}
|
||||
@@ -1178,7 +1178,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: ""}},
|
||||
},
|
||||
}
|
||||
err := query.Validate(RequestTypeTimeSeries)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeTimeSeries)...)
|
||||
if err == nil {
|
||||
t.Errorf("expected error for empty groupBy key with timeseries request type")
|
||||
}
|
||||
@@ -1190,7 +1190,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Having: &Having{Expression: "count() > 10"},
|
||||
}
|
||||
err := query.Validate(RequestTypeRaw)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeRaw)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for having with raw request type, got: %v", err)
|
||||
}
|
||||
@@ -1202,7 +1202,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Having: &Having{Expression: "count() > 10"},
|
||||
}
|
||||
err := query.Validate(RequestTypeTrace)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeTrace)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for having with trace request type, got: %v", err)
|
||||
}
|
||||
@@ -1216,7 +1216,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
{Expression: "count()"},
|
||||
},
|
||||
}
|
||||
err := query.Validate(RequestTypeRaw)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeRaw)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for aggregations with raw request type, got: %v", err)
|
||||
}
|
||||
@@ -1230,7 +1230,7 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
{Expression: "count()"},
|
||||
},
|
||||
}
|
||||
err := query.Validate(RequestTypeRawStream)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeRawStream)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for aggregations with raw_stream request type, got: %v", err)
|
||||
}
|
||||
@@ -1248,12 +1248,12 @@ func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
},
|
||||
}
|
||||
// Should error for raw (selectFields are validated)
|
||||
err := query.Validate(RequestTypeRaw)
|
||||
err := query.Validate(GetValidationOptions(RequestTypeRaw)...)
|
||||
if err == nil {
|
||||
t.Errorf("expected error for isRoot in selectFields with raw request type")
|
||||
}
|
||||
// Should pass for timeseries (selectFields skipped)
|
||||
err = query.Validate(RequestTypeTimeSeries)
|
||||
err = query.Validate(GetValidationOptions(RequestTypeTimeSeries)...)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error for isRoot in selectFields with timeseries request type, got: %v", err)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import requests
|
||||
|
||||
@@ -10,6 +11,95 @@ DEFAULT_TOLERANCE = 1e-9
|
||||
QUERY_TIMEOUT = 30 # seconds
|
||||
|
||||
|
||||
@dataclass
|
||||
class TelemetryFieldKey:
|
||||
name: str
|
||||
field_data_type: str
|
||||
field_context: str
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
return {
|
||||
"name": self.name,
|
||||
"fieldDataType": self.field_data_type,
|
||||
"fieldContext": self.field_context,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class OrderBy:
|
||||
key: TelemetryFieldKey
|
||||
direction: str = "asc"
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
return {"key": self.key.to_dict(), "direction": self.direction}
|
||||
|
||||
|
||||
@dataclass
|
||||
class BuilderQuery:
|
||||
signal: str
|
||||
name: str = "A"
|
||||
limit: Optional[int] = None
|
||||
filter_expression: Optional[str] = None
|
||||
select_fields: Optional[List[TelemetryFieldKey]] = None
|
||||
order: Optional[List[OrderBy]] = None
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
spec: Dict[str, Any] = {
|
||||
"signal": self.signal,
|
||||
"name": self.name,
|
||||
}
|
||||
if self.limit is not None:
|
||||
spec["limit"] = self.limit
|
||||
if self.filter_expression:
|
||||
spec["filter"] = {"expression": self.filter_expression}
|
||||
if self.select_fields:
|
||||
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
|
||||
if self.order:
|
||||
spec["order"] = [o.to_dict() for o in self.order]
|
||||
return {"type": "builder_query", "spec": spec}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TraceOperatorQuery:
|
||||
name: str
|
||||
expression: str
|
||||
return_spans_from: str
|
||||
limit: Optional[int] = None
|
||||
order: Optional[List[OrderBy]] = None
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
spec: Dict[str, Any] = {
|
||||
"name": self.name,
|
||||
"expression": self.expression,
|
||||
"returnSpansFrom": self.return_spans_from,
|
||||
}
|
||||
if self.limit is not None:
|
||||
spec["limit"] = self.limit
|
||||
if self.order:
|
||||
spec["order"] = [o.to_dict() for o in self.order]
|
||||
return {"type": "builder_trace_operator", "spec": spec}
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryRangeRequest:
|
||||
start: int # nanoseconds
|
||||
end: int # nanoseconds
|
||||
queries: List[Union[BuilderQuery, TraceOperatorQuery]]
|
||||
request_type: Optional[str] = "raw"
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
body: Dict[str, Any] = {
|
||||
"start": self.start,
|
||||
"end": self.end,
|
||||
"compositeQuery": {
|
||||
"queries": [q.to_dict() for q in self.queries],
|
||||
},
|
||||
}
|
||||
if self.request_type is not None:
|
||||
body["requestType"] = self.request_type
|
||||
return body
|
||||
|
||||
|
||||
def make_query_request(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
|
||||
@@ -696,7 +696,6 @@ def test_traces_list_with_corrupt_data(
|
||||
assert response.status_code == status_code
|
||||
|
||||
if response.status_code == HTTPStatus.OK:
|
||||
|
||||
if not results(traces):
|
||||
# No results expected
|
||||
assert response.json()["data"]["data"]["results"][0]["rows"] is None
|
||||
@@ -2026,3 +2025,136 @@ def test_traces_fill_zero_formula_with_group_by(
|
||||
expected_by_ts=expectations[service_name],
|
||||
context=f"traces/fillZero/F1/{service_name}",
|
||||
)
|
||||
|
||||
|
||||
def test_traces_list_filter_by_trace_id(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Tests that filtering by trace_id:
|
||||
1. Returns the matching span (narrow window, single bucket).
|
||||
2. Does not return duplicate spans when the query window spans multiple
|
||||
exponential buckets (>1 h)
|
||||
3. Returns no results when the query window does not contain the trace.
|
||||
"""
|
||||
target_trace_id = TraceIdGenerator.trace_id()
|
||||
other_trace_id = TraceIdGenerator.trace_id()
|
||||
span_id_root = TraceIdGenerator.span_id()
|
||||
other_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
common_resources = {
|
||||
"deployment.environment": "production",
|
||||
"service.name": "trace-filter-service",
|
||||
"cloud.provider": "integration",
|
||||
}
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=target_trace_id,
|
||||
span_id=span_id_root,
|
||||
parent_span_id="",
|
||||
name="root-span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={"http.request.method": "GET"},
|
||||
),
|
||||
# span from a different trace — must not appear in results
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=other_trace_id,
|
||||
span_id=other_span_id,
|
||||
parent_span_id="",
|
||||
name="other-root-span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
trace_filter = f"trace_id = '{target_trace_id}'"
|
||||
|
||||
def _query(start_ms: int, end_ms: int) -> List:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=start_ms,
|
||||
end_ms=end_ms,
|
||||
request_type="raw",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"limit": 100,
|
||||
"offset": 0,
|
||||
"filter": {"expression": trace_filter},
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
|
||||
"selectFields": [
|
||||
{
|
||||
"name": "name",
|
||||
"fieldDataType": "string",
|
||||
"fieldContext": "span",
|
||||
"signal": "traces",
|
||||
}
|
||||
],
|
||||
"having": {"expression": ""},
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
return response.json()["data"]["data"]["results"][0]["rows"] or []
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# --- Test 1: narrow window (single bucket, <1 h) ---
|
||||
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
narrow_rows = _query(narrow_start_ms, now_ms)
|
||||
|
||||
assert (
|
||||
len(narrow_rows) == 1
|
||||
), f"Expected 1 span for trace_id filter (narrow window), got {len(narrow_rows)}"
|
||||
assert narrow_rows[0]["data"]["span_id"] == span_id_root
|
||||
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
|
||||
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
|
||||
# should just return 1 span, not duplicate
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_rows = _query(wide_start_ms, now_ms)
|
||||
|
||||
assert len(wide_rows) == 1, (
|
||||
f"Expected 1 span for trace_id filter (wide window, multi-bucket), "
|
||||
f"got {len(wide_rows)} — possible duplicate-span regression"
|
||||
)
|
||||
assert wide_rows[0]["data"]["span_id"] == span_id_root
|
||||
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
|
||||
# --- Test 3: window that does not contain the trace returns no results ---
|
||||
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
|
||||
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
past_rows = _query(past_start_ms, past_end_ms)
|
||||
|
||||
assert len(past_rows) == 0, (
|
||||
f"Expected 0 spans for trace_id filter outside time window, "
|
||||
f"got {len(past_rows)}"
|
||||
)
|
||||
|
||||
670
tests/integration/src/rawexportdata/01_logs.py
Normal file
670
tests/integration/src/rawexportdata/01_logs.py
Normal file
@@ -0,0 +1,670 @@
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.logs import Logs
|
||||
from fixtures.querier import BuilderQuery, OrderBy, QueryRangeRequest, TelemetryFieldKey
|
||||
|
||||
|
||||
def test_export_raw_data_get_not_allowed(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Tests:
|
||||
1. GET request to export_raw_data is rejected with 405 Method Not Allowed
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.METHOD_NOT_ALLOWED
|
||||
|
||||
|
||||
def test_export_logs_csv(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 3 logs with different severity levels and attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs as CSV format
|
||||
2. Verify CSV structure and content
|
||||
3. Validate headers are present
|
||||
4. Check log data is correctly formatted
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Application started successfully",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "GET",
|
||||
"http.status_code": 200,
|
||||
"user.id": "user123",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Connection to database failed",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"error.type": "ConnectionError",
|
||||
"db.name": "production_db",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Request processed",
|
||||
severity_text="DEBUG",
|
||||
resources={
|
||||
"service.name": "worker-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-02",
|
||||
},
|
||||
attributes={
|
||||
"request.id": "req-456",
|
||||
"duration_ms": 150.5,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="logs", name="A")],
|
||||
).to_dict()
|
||||
|
||||
# Export logs as CSV (default format)
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data"),
|
||||
json=body,
|
||||
timeout=30,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
|
||||
|
||||
# Verify log bodies are present in the exported data
|
||||
bodies = [row.get("body") for row in rows]
|
||||
assert "Application started successfully" in bodies
|
||||
assert "Connection to database failed" in bodies
|
||||
assert "Request processed" in bodies
|
||||
|
||||
# Verify severity levels
|
||||
severities = [row.get("severity_text") for row in rows]
|
||||
assert "INFO" in severities
|
||||
assert "ERROR" in severities
|
||||
assert "DEBUG" in severities
|
||||
|
||||
|
||||
def test_export_logs_jsonl(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 2 logs with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs as JSONL format
|
||||
2. Verify JSONL structure and content
|
||||
3. Check each line is valid JSON
|
||||
4. Validate log data is correctly formatted
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="User logged in",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "auth-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"user.email": "test@example.com",
|
||||
"session.id": "sess-789",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Payment processed successfully",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "payment-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"transaction.id": "txn-123",
|
||||
"amount": 99.99,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="logs", name="A")],
|
||||
).to_dict()
|
||||
|
||||
# Export logs as JSONL
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify each line is valid JSON
|
||||
json_objects = []
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
json_objects.append(obj)
|
||||
assert "id" in obj
|
||||
assert "timestamp" in obj
|
||||
assert "body" in obj
|
||||
assert "severity_text" in obj
|
||||
|
||||
# Verify log bodies
|
||||
bodies = [obj.get("body") for obj in json_objects]
|
||||
assert "User logged in" in bodies
|
||||
assert "Payment processed successfully" in bodies
|
||||
|
||||
|
||||
def test_export_logs_with_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with different severity levels.
|
||||
|
||||
Tests:
|
||||
1. Export logs with filter applied
|
||||
2. Verify only filtered logs are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Error message",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Another error message",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="logs", name="A", filter_expression="severity_text = 'ERROR'"
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
# Export logs with filter
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines (filtered), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify only ERROR logs are returned
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
assert obj["severity_text"] == "ERROR"
|
||||
assert "error message" in obj["body"].lower()
|
||||
|
||||
|
||||
def test_export_logs_with_limit(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 5 logs.
|
||||
|
||||
Tests:
|
||||
1. Export logs with limit applied
|
||||
2. Verify only limited number of logs are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
logs = []
|
||||
for i in range(5):
|
||||
logs.append(
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
body=f"Log message {i}",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={
|
||||
"index": i,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
insert_logs(logs)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="logs", name="A", limit=3)],
|
||||
).to_dict()
|
||||
|
||||
# Export logs with limit
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=csv"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
|
||||
|
||||
|
||||
def test_export_logs_with_columns(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with various attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs with specific columns
|
||||
2. Verify only specified columns are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Test log message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
"deployment.environment": "production",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "GET",
|
||||
"http.status_code": 200,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="logs",
|
||||
name="A",
|
||||
select_fields=[
|
||||
TelemetryFieldKey("timestamp", "string", "log"),
|
||||
TelemetryFieldKey("severity_text", "string", "log"),
|
||||
TelemetryFieldKey("body", "string", "log"),
|
||||
],
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
# Export logs with specific columns
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=csv"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 1
|
||||
|
||||
# Verify the specified columns are present
|
||||
row = rows[0]
|
||||
assert "timestamp" in row
|
||||
assert "severity_text" in row
|
||||
assert "body" in row
|
||||
assert row["severity_text"] == "INFO"
|
||||
assert row["body"] == "Test log message"
|
||||
|
||||
|
||||
def test_export_logs_with_order_by(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs at different timestamps.
|
||||
|
||||
Tests:
|
||||
1. Export logs with ascending timestamp order
|
||||
2. Verify logs are returned in correct order
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="First log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Second log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
body="Third log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="logs",
|
||||
name="A",
|
||||
order=[OrderBy(TelemetryFieldKey("timestamp", "string", "log"), "asc")],
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
# Export logs with ascending order
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 3
|
||||
|
||||
# Verify order - first log should be "First log" (oldest)
|
||||
json_objects = [json.loads(line) for line in jsonl_lines]
|
||||
assert json_objects[0]["body"] == "First log"
|
||||
assert json_objects[1]["body"] == "Second log"
|
||||
assert json_objects[2]["body"] == "Third log"
|
||||
|
||||
|
||||
def test_export_logs_with_complex_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with various service names and severity levels.
|
||||
|
||||
Tests:
|
||||
1. Export logs with complex filter (multiple conditions)
|
||||
2. Verify only logs matching all conditions are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="API error occurred",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Worker info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "worker-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="API info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="logs",
|
||||
name="A",
|
||||
filter_expression="service.name = 'api-service' AND severity_text = 'ERROR'",
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
# Export logs with complex filter
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert (
|
||||
len(jsonl_lines) == 1
|
||||
), f"Expected 1 line (complex filter), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify the filtered log
|
||||
filtered_obj = json.loads(jsonl_lines[0])
|
||||
assert filtered_obj["body"] == "API error occurred"
|
||||
assert filtered_obj["severity_text"] == "ERROR"
|
||||
763
tests/integration/src/rawexportdata/02_traces.py
Normal file
763
tests/integration/src/rawexportdata/02_traces.py
Normal file
@@ -0,0 +1,763 @@
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import (
|
||||
BuilderQuery,
|
||||
OrderBy,
|
||||
QueryRangeRequest,
|
||||
TelemetryFieldKey,
|
||||
TraceOperatorQuery,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def test_export_raw_data_get_not_allowed(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Tests:
|
||||
1. GET request to export_raw_data is rejected with 405 Method Not Allowed
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.METHOD_NOT_ALLOWED
|
||||
|
||||
|
||||
def test_export_traces_csv(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 3 traces with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces as CSV format
|
||||
2. Verify CSV structure and content
|
||||
3. Validate headers are present
|
||||
4. Check trace data is correctly formatted
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
http_service_db_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /integration",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
},
|
||||
attributes={
|
||||
"net.transport": "IP.TCP",
|
||||
"http.scheme": "http",
|
||||
"http.user_agent": "Integration Test",
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "200",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3.5),
|
||||
duration=timedelta(seconds=0.5),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_db_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="SELECT",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
},
|
||||
attributes={
|
||||
"db.name": "integration",
|
||||
"db.operation": "SELECT",
|
||||
"db.statement": "SELECT * FROM integration",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="topic publish",
|
||||
kind=TracesKind.SPAN_KIND_PRODUCER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "topic-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-001",
|
||||
},
|
||||
attributes={
|
||||
"message.type": "SENT",
|
||||
"messaging.operation": "publish",
|
||||
"messaging.message.id": "001",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="traces", name="A", limit=1000)],
|
||||
).to_dict()
|
||||
|
||||
# Export traces as CSV
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data"),
|
||||
json=body,
|
||||
timeout=30,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
|
||||
|
||||
# Verify trace IDs are present in the exported data
|
||||
trace_ids = [row.get("trace_id") for row in rows]
|
||||
assert http_service_trace_id in trace_ids
|
||||
assert topic_service_trace_id in trace_ids
|
||||
|
||||
# Verify span names are present
|
||||
span_names = [row.get("name") for row in rows]
|
||||
assert "POST /integration" in span_names
|
||||
assert "SELECT" in span_names
|
||||
assert "topic publish" in span_names
|
||||
|
||||
|
||||
def test_export_traces_jsonl(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 2 traces with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces as JSONL format
|
||||
2. Verify JSONL structure and content
|
||||
3. Check each line is valid JSON
|
||||
4. Validate trace data is correctly formatted
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /api/test",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "201",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="queue.process",
|
||||
kind=TracesKind.SPAN_KIND_CONSUMER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "queue-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"messaging.operation": "process",
|
||||
"messaging.system": "rabbitmq",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="traces", name="A", limit=1000)],
|
||||
).to_dict()
|
||||
|
||||
# Export traces as JSONL
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify each line is valid JSON
|
||||
json_objects = []
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
json_objects.append(obj)
|
||||
assert "trace_id" in obj
|
||||
assert "span_id" in obj
|
||||
assert "name" in obj
|
||||
|
||||
# Verify trace IDs are present
|
||||
trace_ids = [obj.get("trace_id") for obj in json_objects]
|
||||
assert http_service_trace_id in trace_ids
|
||||
assert topic_service_trace_id in trace_ids
|
||||
|
||||
# Verify span names are present
|
||||
span_names = [obj.get("name") for obj in json_objects]
|
||||
assert "POST /api/test" in span_names
|
||||
assert "queue.process" in span_names
|
||||
|
||||
|
||||
def test_export_traces_with_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert traces with different service names.
|
||||
|
||||
Tests:
|
||||
1. Export traces with filter applied
|
||||
2. Verify only filtered traces are returned
|
||||
"""
|
||||
service_a_trace_id = TraceIdGenerator.trace_id()
|
||||
service_a_span_id = TraceIdGenerator.span_id()
|
||||
service_b_trace_id = TraceIdGenerator.trace_id()
|
||||
service_b_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_a_trace_id,
|
||||
span_id=service_a_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-a",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-a",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_b_trace_id,
|
||||
span_id=service_b_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-b",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-b",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="traces",
|
||||
name="A",
|
||||
limit=1000,
|
||||
filter_expression="service.name = 'service-a'",
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
# Export traces with filter
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1, f"Expected 1 line (filtered), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify the filtered trace
|
||||
filtered_obj = json.loads(jsonl_lines[0])
|
||||
assert filtered_obj["trace_id"] == service_a_trace_id
|
||||
assert filtered_obj["name"] == "operation-a"
|
||||
|
||||
|
||||
def test_export_traces_with_limit(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 5 traces.
|
||||
|
||||
Tests:
|
||||
1. Export traces with limit applied
|
||||
2. Verify only limited number of traces are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
traces = []
|
||||
for i in range(5):
|
||||
traces.append(
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name=f"operation-{i}",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
)
|
||||
)
|
||||
|
||||
insert_traces(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[BuilderQuery(signal="traces", name="A", limit=3)],
|
||||
).to_dict()
|
||||
|
||||
# Export traces with limit
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=csv"),
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
|
||||
|
||||
|
||||
def test_export_traces_multiple_queries_rejected(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Tests:
|
||||
1. POST with multiple builder queries but no trace operator is rejected
|
||||
2. Verify 400 error is returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
request_type=None,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="traces",
|
||||
name="A",
|
||||
limit=1000,
|
||||
filter_expression="service.name = 'service-a'",
|
||||
),
|
||||
BuilderQuery(
|
||||
signal="traces",
|
||||
name="B",
|
||||
limit=1000,
|
||||
filter_expression="service.name = 'service-b'",
|
||||
),
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl")
|
||||
response = requests.post(
|
||||
url,
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.BAD_REQUEST
|
||||
|
||||
|
||||
def test_export_traces_with_composite_query_trace_operator(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert multiple traces with parent-child relationships.
|
||||
|
||||
Tests:
|
||||
1. Export traces using trace operator in composite query (POST)
|
||||
2. Verify trace operator query works correctly
|
||||
"""
|
||||
parent_trace_id = TraceIdGenerator.trace_id()
|
||||
parent_span_id = TraceIdGenerator.span_id()
|
||||
child_span_id_1 = TraceIdGenerator.span_id()
|
||||
child_span_id_2 = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=parent_span_id,
|
||||
parent_span_id="",
|
||||
name="parent-operation",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "parent",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=child_span_id_1,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child-operation-1",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "child",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=7),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=child_span_id_2,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child-operation-2",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "child",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# A: spans with operation.type = 'parent'
|
||||
query_a = BuilderQuery(
|
||||
signal="traces",
|
||||
name="A",
|
||||
limit=1000,
|
||||
filter_expression="operation.type = 'parent'",
|
||||
)
|
||||
|
||||
# B: spans with operation.type = 'child'
|
||||
query_b = BuilderQuery(
|
||||
signal="traces",
|
||||
name="B",
|
||||
limit=1000,
|
||||
filter_expression="operation.type = 'child'",
|
||||
)
|
||||
|
||||
# Trace operator: find traces where A has a direct descendant B
|
||||
query_c = TraceOperatorQuery(
|
||||
name="C",
|
||||
expression="A => B",
|
||||
return_spans_from="A",
|
||||
limit=1000,
|
||||
order=[OrderBy(TelemetryFieldKey("timestamp", "string", "span"), "desc")],
|
||||
)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[query_a, query_b, query_c],
|
||||
).to_dict()
|
||||
|
||||
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl")
|
||||
response = requests.post(
|
||||
url,
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1, f"Expected at least 1 line, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify all returned spans belong to the matched trace
|
||||
json_objects = [json.loads(line) for line in jsonl_lines]
|
||||
trace_ids = [obj.get("trace_id") for obj in json_objects]
|
||||
assert all(tid == parent_trace_id for tid in trace_ids)
|
||||
|
||||
# Verify the parent span (returnSpansFrom = "A") is present
|
||||
span_names = [obj.get("name") for obj in json_objects]
|
||||
assert "parent-operation" in span_names
|
||||
|
||||
|
||||
def test_export_traces_with_select_fields(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert traces with various attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces with specific select fields via POST
|
||||
2. Verify only specified fields are returned in the output
|
||||
"""
|
||||
trace_id = TraceIdGenerator.trace_id()
|
||||
span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=trace_id,
|
||||
span_id=span_id,
|
||||
parent_span_id="",
|
||||
name="test-operation",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "POST",
|
||||
"http.status_code": "201",
|
||||
"user.id": "user123",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
body = QueryRangeRequest(
|
||||
start=start_ns,
|
||||
end=end_ns,
|
||||
queries=[
|
||||
BuilderQuery(
|
||||
signal="traces",
|
||||
name="A",
|
||||
limit=1000,
|
||||
select_fields=[
|
||||
TelemetryFieldKey("trace_id", "string", "span"),
|
||||
TelemetryFieldKey("span_id", "string", "span"),
|
||||
TelemetryFieldKey("name", "string", "span"),
|
||||
TelemetryFieldKey("service.name", "string", "resource"),
|
||||
],
|
||||
)
|
||||
],
|
||||
).to_dict()
|
||||
|
||||
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data?format=jsonl")
|
||||
response = requests.post(
|
||||
url,
|
||||
json=body,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1
|
||||
|
||||
# Verify the selected fields are present
|
||||
result = json.loads(jsonl_lines[0])
|
||||
assert "trace_id" in result
|
||||
assert "span_id" in result
|
||||
assert "name" in result
|
||||
|
||||
# Verify values
|
||||
assert result["trace_id"] == trace_id
|
||||
assert result["span_id"] == span_id
|
||||
assert result["name"] == "test-operation"
|
||||
Reference in New Issue
Block a user