mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-12 20:00:28 +01:00
Compare commits
7 Commits
main
...
platform-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa454f6b2a | ||
|
|
e38ed9649f | ||
|
|
386358214c | ||
|
|
38b28ba4c5 | ||
|
|
9237e25ef8 | ||
|
|
867d626b27 | ||
|
|
0769ac0e7d |
@@ -12790,6 +12790,53 @@ paths:
|
||||
summary: Update a span mapper
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/stats:
|
||||
get:
|
||||
deprecated: false
|
||||
description: This endpoint returns the collected stats for the organization
|
||||
operationId: GetStats
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Unauthorized
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
security:
|
||||
- api_key:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: Get stats
|
||||
tags:
|
||||
- stats
|
||||
/api/v1/testChannel:
|
||||
post:
|
||||
deprecated: true
|
||||
|
||||
@@ -109,6 +109,20 @@ func (h *handler) CreateThing(rw http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
```
|
||||
|
||||
When you need an ID from `claims` as a `valuer.UUID` (for example to pass it to a module), derive it with the `Must*` constructor instead of `NewUUID` plus an error check. Claims are validated by the auth middleware, so the conversion cannot fail and the error branch would be dead code:
|
||||
|
||||
```go
|
||||
// Good — claims are pre-validated, the conversion cannot fail.
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
// Avoid — the error path is unreachable.
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Register the handler in `signozapiserver`
|
||||
|
||||
In `pkg/apiserver/signozapiserver`, add a route in the appropriate `add*Routes` function (`addUserRoutes`, `addSessionRoutes`, `addOrgRoutes`, etc.). The pattern is:
|
||||
@@ -387,3 +401,4 @@ Note the discriminator property lives in the variants, not on the parent — the
|
||||
- **Add `nullable:"true"`** on fields that can be `null`. Pay special attention to slices and maps -- in Go these default to `nil` which serializes to `null`. If the field should always be an array, initialize it and do not mark it nullable.
|
||||
- **Implement `Enum()`** on every type that has a fixed set of acceptable values so the JSON schema generates proper `enum` constraints.
|
||||
- **Add request examples** via `RequestExamples` in `OpenAPIDef` for any non-trivial endpoint. See `pkg/apiserver/signozapiserver/querier.go` for reference.
|
||||
- **Derive IDs from `claims` with `valuer.MustNewUUID`** (e.g. `claims.OrgID`, `claims.UserID`). Claims are pre-validated by the auth middleware, so use the `Must*` constructor — don't write `NewUUID` followed by an `if err != nil { render.Error(...); return }` block.
|
||||
|
||||
@@ -9746,6 +9746,19 @@ export type UpdateSpanMapperPathParameters = {
|
||||
groupId: string;
|
||||
mapperId: string;
|
||||
};
|
||||
export type GetStats200Data = { [key: string]: unknown };
|
||||
|
||||
export type GetStats200 = {
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
data: GetStats200Data;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type GetTraceAggregationsPathParameters = {
|
||||
traceID: string;
|
||||
};
|
||||
|
||||
96
frontend/src/api/generated/services/stats/index.ts
Normal file
96
frontend/src/api/generated/services/stats/index.ts
Normal file
@@ -0,0 +1,96 @@
|
||||
/**
|
||||
* ! Do not edit manually
|
||||
* * The file has been auto-generated using Orval for SigNoz
|
||||
* * regenerate with 'pnpm generate:api'
|
||||
* SigNoz
|
||||
*/
|
||||
import { useQuery } from 'react-query';
|
||||
import type {
|
||||
InvalidateOptions,
|
||||
QueryClient,
|
||||
QueryFunction,
|
||||
QueryKey,
|
||||
UseQueryOptions,
|
||||
UseQueryResult,
|
||||
} from 'react-query';
|
||||
|
||||
import type { GetStats200, RenderErrorResponseDTO } from '../sigNoz.schemas';
|
||||
|
||||
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
|
||||
import type { ErrorType } from '../../../generatedAPIInstance';
|
||||
|
||||
/**
|
||||
* This endpoint returns the collected stats for the organization
|
||||
* @summary Get stats
|
||||
*/
|
||||
export const getStats = (signal?: AbortSignal) => {
|
||||
return GeneratedAPIInstance<GetStats200>({
|
||||
url: `/api/v1/stats`,
|
||||
method: 'GET',
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getGetStatsQueryKey = () => {
|
||||
return [`/api/v1/stats`] as const;
|
||||
};
|
||||
|
||||
export const getGetStatsQueryOptions = <
|
||||
TData = Awaited<ReturnType<typeof getStats>>,
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
>(options?: {
|
||||
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
|
||||
}) => {
|
||||
const { query: queryOptions } = options ?? {};
|
||||
|
||||
const queryKey = queryOptions?.queryKey ?? getGetStatsQueryKey();
|
||||
|
||||
const queryFn: QueryFunction<Awaited<ReturnType<typeof getStats>>> = ({
|
||||
signal,
|
||||
}) => getStats(signal);
|
||||
|
||||
return { queryKey, queryFn, ...queryOptions } as UseQueryOptions<
|
||||
Awaited<ReturnType<typeof getStats>>,
|
||||
TError,
|
||||
TData
|
||||
> & { queryKey: QueryKey };
|
||||
};
|
||||
|
||||
export type GetStatsQueryResult = NonNullable<
|
||||
Awaited<ReturnType<typeof getStats>>
|
||||
>;
|
||||
export type GetStatsQueryError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Get stats
|
||||
*/
|
||||
|
||||
export function useGetStats<
|
||||
TData = Awaited<ReturnType<typeof getStats>>,
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
>(options?: {
|
||||
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
|
||||
}): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
|
||||
const queryOptions = getGetStatsQueryOptions(options);
|
||||
|
||||
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
|
||||
queryKey: QueryKey;
|
||||
};
|
||||
|
||||
return { ...query, queryKey: queryOptions.queryKey };
|
||||
}
|
||||
|
||||
/**
|
||||
* @summary Get stats
|
||||
*/
|
||||
export const invalidateGetStats = async (
|
||||
queryClient: QueryClient,
|
||||
options?: InvalidateOptions,
|
||||
): Promise<QueryClient> => {
|
||||
await queryClient.invalidateQueries(
|
||||
{ queryKey: getGetStatsQueryKey() },
|
||||
options,
|
||||
);
|
||||
|
||||
return queryClient;
|
||||
};
|
||||
@@ -796,7 +796,7 @@ export const getClusterMetricsQueryPayload = (
|
||||
key: k8sDeploymentDesiredKey,
|
||||
type: 'Gauge',
|
||||
},
|
||||
aggregateOperator: 'latest',
|
||||
aggregateOperator: 'avg',
|
||||
dataSource: DataSource.METRICS,
|
||||
disabled: false,
|
||||
expression: 'B',
|
||||
@@ -839,7 +839,7 @@ export const getClusterMetricsQueryPayload = (
|
||||
reduceTo: ReduceOperators.LAST,
|
||||
spaceAggregation: 'sum',
|
||||
stepInterval: 60,
|
||||
timeAggregation: 'latest',
|
||||
timeAggregation: 'avg',
|
||||
},
|
||||
],
|
||||
queryFormulas: [],
|
||||
|
||||
@@ -40,7 +40,6 @@ import { K8S_ENTITY_EVENTS_EXPRESSION_KEY, useEntityEvents } from './hooks';
|
||||
import { getEntityEventsQueryPayload, isEventsKeyNotFoundError } from './utils';
|
||||
|
||||
import styles from './EntityEvents.module.scss';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
|
||||
interface EventDataType {
|
||||
key: string;
|
||||
@@ -168,25 +167,17 @@ function EntityEventsContent({
|
||||
[events],
|
||||
);
|
||||
|
||||
const { formatTimezoneAdjustedTimestamp } = useTimezone();
|
||||
const columns: TableColumnsType<EventDataType> = useMemo(
|
||||
() => [
|
||||
{ title: 'Severity', dataIndex: 'severity', key: 'severity', width: 100 },
|
||||
{
|
||||
title: 'Timestamp',
|
||||
dataIndex: 'timestamp',
|
||||
width: 240,
|
||||
ellipsis: true,
|
||||
key: 'timestamp',
|
||||
render: (value: string | number): string =>
|
||||
formatTimezoneAdjustedTimestamp(
|
||||
typeof value === 'string' ? value : value / 1e6,
|
||||
),
|
||||
},
|
||||
{ title: 'Body', dataIndex: 'body', key: 'body' },
|
||||
],
|
||||
[formatTimezoneAdjustedTimestamp],
|
||||
);
|
||||
const columns: TableColumnsType<EventDataType> = [
|
||||
{ title: 'Severity', dataIndex: 'severity', key: 'severity', width: 100 },
|
||||
{
|
||||
title: 'Timestamp',
|
||||
dataIndex: 'timestamp',
|
||||
width: 240,
|
||||
ellipsis: true,
|
||||
key: 'timestamp',
|
||||
},
|
||||
{ title: 'Body', dataIndex: 'body', key: 'body' },
|
||||
];
|
||||
|
||||
const handleExpandRowIcon = ({
|
||||
expanded,
|
||||
|
||||
@@ -41,7 +41,6 @@ import { getTraceListColumns } from './traceListColumns';
|
||||
import { getEntityTracesQueryPayload } from './utils';
|
||||
|
||||
import styles from './EntityTraces.module.scss';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
|
||||
interface Props {
|
||||
timeRange: {
|
||||
@@ -137,11 +136,7 @@ function EntityTracesContent({
|
||||
[timeRange.startTime, timeRange.endTime, userExpression],
|
||||
);
|
||||
|
||||
const { formatTimezoneAdjustedTimestamp } = useTimezone();
|
||||
const traceListColumns = getTraceListColumns(
|
||||
selectedEntityTracesColumns,
|
||||
formatTimezoneAdjustedTimestamp,
|
||||
);
|
||||
const traceListColumns = getTraceListColumns(selectedEntityTracesColumns);
|
||||
|
||||
const isKeyNotFound = isKeyNotFoundError(error);
|
||||
const isDataEmpty =
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import { TableColumnsType as ColumnsType } from 'antd';
|
||||
import { Badge } from '@signozhq/ui/badge';
|
||||
import { Typography } from '@signozhq/ui/typography';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import { getMs } from 'container/Trace/Filters/Panel/PanelBody/Duration/util';
|
||||
import {
|
||||
BlockLink,
|
||||
getTraceLink,
|
||||
} from 'container/TracesExplorer/ListView/utils';
|
||||
import dayjs from 'dayjs';
|
||||
import { RowData } from 'lib/query/createTableColumnsFromQuery';
|
||||
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { FormatTimezoneAdjustedTimestamp } from 'hooks/useTimezoneFormatter/useTimezoneFormatter';
|
||||
|
||||
const keyToLabelMap: Record<string, string> = {
|
||||
timestamp: 'Timestamp',
|
||||
@@ -58,7 +59,6 @@ const getValueForKey = (data: Record<string, any>, key: string): any => {
|
||||
|
||||
export const getTraceListColumns = (
|
||||
selectedColumns: BaseAutocompleteData[],
|
||||
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp,
|
||||
): ColumnsType<RowData> => {
|
||||
const columns: ColumnsType<RowData> =
|
||||
selectedColumns.map(({ dataType, key, type }) => ({
|
||||
@@ -73,8 +73,8 @@ export const getTraceListColumns = (
|
||||
if (primaryKey === 'timestamp') {
|
||||
const date =
|
||||
typeof value === 'string'
|
||||
? formatTimezoneAdjustedTimestamp(value)
|
||||
: formatTimezoneAdjustedTimestamp(value / 1e6);
|
||||
? dayjs(value).format(DATE_TIME_FORMATS.ISO_DATETIME_MS)
|
||||
: dayjs(value / 1e6).format(DATE_TIME_FORMATS.ISO_DATETIME_MS);
|
||||
|
||||
return (
|
||||
<BlockLink to={getTraceLink(itemData)} openInNewTab>
|
||||
|
||||
@@ -1366,7 +1366,7 @@ export const getPodMetricsQueryPayload = (
|
||||
orderBy: [],
|
||||
queryName: 'B',
|
||||
reduceTo: ReduceOperators.AVG,
|
||||
spaceAggregation: 'sum',
|
||||
spaceAggregation: 'avg',
|
||||
stepInterval: 60,
|
||||
timeAggregation: 'avg',
|
||||
},
|
||||
|
||||
@@ -86,9 +86,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
|
||||
},
|
||||
{
|
||||
id: 'capacity',
|
||||
header: 'Capacity',
|
||||
header: 'Volume Capacity',
|
||||
accessorFn: (row): number => row.volumeCapacity,
|
||||
width: { min: 140 },
|
||||
width: { min: 220 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const capacity = value as number;
|
||||
@@ -105,9 +105,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
|
||||
},
|
||||
{
|
||||
id: 'usage',
|
||||
header: 'Used',
|
||||
header: 'Volume Utilization',
|
||||
accessorFn: (row): number => row.volumeUsage,
|
||||
width: { min: 140 },
|
||||
width: { min: 220 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const usage = value as number;
|
||||
@@ -124,9 +124,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
|
||||
},
|
||||
{
|
||||
id: 'available',
|
||||
header: 'Available',
|
||||
header: 'Volume Available',
|
||||
accessorFn: (row): number => row.volumeAvailable,
|
||||
width: { min: 140 },
|
||||
width: { min: 220 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const available = value as number;
|
||||
@@ -141,61 +141,4 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'inodes',
|
||||
header: 'Inodes',
|
||||
accessorFn: (row): number => row.volumeInodes,
|
||||
width: { min: 140 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const inodes = value as number;
|
||||
return (
|
||||
<ValidateColumnValueWrapper
|
||||
value={inodes}
|
||||
entity={InfraMonitoringEntity.VOLUMES}
|
||||
attribute="inodes metric"
|
||||
>
|
||||
<TanStackTable.Text>{inodes}</TanStackTable.Text>
|
||||
</ValidateColumnValueWrapper>
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'inodesUsed',
|
||||
header: 'Inodes Used',
|
||||
accessorFn: (row): number => row.volumeInodesUsed,
|
||||
width: { min: 160 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const inodesUsed = value as number;
|
||||
return (
|
||||
<ValidateColumnValueWrapper
|
||||
value={inodesUsed}
|
||||
entity={InfraMonitoringEntity.VOLUMES}
|
||||
attribute="inodes used metric"
|
||||
>
|
||||
<TanStackTable.Text>{inodesUsed}</TanStackTable.Text>
|
||||
</ValidateColumnValueWrapper>
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'inodesFree',
|
||||
header: 'Inodes Free',
|
||||
accessorFn: (row): number => row.volumeInodesFree,
|
||||
width: { min: 160 },
|
||||
enableSort: true,
|
||||
cell: ({ value }): React.ReactNode => {
|
||||
const inodesFree = value as number;
|
||||
return (
|
||||
<ValidateColumnValueWrapper
|
||||
value={inodesFree}
|
||||
entity={InfraMonitoringEntity.VOLUMES}
|
||||
attribute="inodes free metric"
|
||||
>
|
||||
<TanStackTable.Text>{inodesFree}</TanStackTable.Text>
|
||||
</ValidateColumnValueWrapper>
|
||||
);
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
@@ -22,13 +22,11 @@ interface CacheEntry {
|
||||
const CACHE_SIZE_LIMIT = 1000;
|
||||
const CACHE_CLEANUP_PERCENTAGE = 0.5; // Remove 50% when limit is reached
|
||||
|
||||
export type FormatTimezoneAdjustedTimestamp = (
|
||||
input: TimestampInput,
|
||||
format?: string,
|
||||
) => string;
|
||||
|
||||
function useTimezoneFormatter({ userTimezone }: { userTimezone: Timezone }): {
|
||||
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp;
|
||||
formatTimezoneAdjustedTimestamp: (
|
||||
input: TimestampInput,
|
||||
format?: string,
|
||||
) => string;
|
||||
} {
|
||||
// Initialize cache using useMemo to persist between renders
|
||||
const cache = useMemo(() => new Map<string, CacheEntry>(), []);
|
||||
|
||||
@@ -19,14 +19,17 @@ import {
|
||||
} from 'components/CustomTimePicker/timezoneUtils';
|
||||
import { LOCALSTORAGE } from 'constants/localStorage';
|
||||
import useTimezoneFormatter, {
|
||||
FormatTimezoneAdjustedTimestamp,
|
||||
TimestampInput,
|
||||
} from 'hooks/useTimezoneFormatter/useTimezoneFormatter';
|
||||
|
||||
export interface TimezoneContextType {
|
||||
timezone: Timezone;
|
||||
browserTimezone: Timezone;
|
||||
updateTimezone: (timezone: Timezone) => void;
|
||||
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp;
|
||||
formatTimezoneAdjustedTimestamp: (
|
||||
input: TimestampInput,
|
||||
format?: string,
|
||||
) => string;
|
||||
isAdaptationEnabled: boolean;
|
||||
setIsAdaptationEnabled: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
@@ -70,6 +71,7 @@ type provider struct {
|
||||
traceDetailHandler tracedetail.Handler
|
||||
rulerHandler ruler.Handler
|
||||
llmPricingRuleHandler llmpricingrule.Handler
|
||||
statsHandler statsreporter.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -102,6 +104,7 @@ func NewFactory(
|
||||
llmPricingRuleHandler llmpricingrule.Handler,
|
||||
traceDetailHandler tracedetail.Handler,
|
||||
rulerHandler ruler.Handler,
|
||||
statsHandler statsreporter.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -137,6 +140,7 @@ func NewFactory(
|
||||
llmPricingRuleHandler,
|
||||
traceDetailHandler,
|
||||
rulerHandler,
|
||||
statsHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -174,6 +178,7 @@ func newProvider(
|
||||
llmPricingRuleHandler llmpricingrule.Handler,
|
||||
traceDetailHandler tracedetail.Handler,
|
||||
rulerHandler ruler.Handler,
|
||||
statsHandler statsreporter.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -210,6 +215,7 @@ func newProvider(
|
||||
traceDetailHandler: traceDetailHandler,
|
||||
rulerHandler: rulerHandler,
|
||||
llmPricingRuleHandler: llmPricingRuleHandler,
|
||||
statsHandler: statsHandler,
|
||||
}
|
||||
|
||||
provider.authzMiddleware = middleware.NewAuthZ(settings.Logger(), orgGetter, authzService)
|
||||
@@ -334,6 +340,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addStatsReporterRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
33
pkg/apiserver/signozapiserver/statsreporter.go
Normal file
33
pkg/apiserver/signozapiserver/statsreporter.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addStatsReporterRoutes(router *mux.Router) error {
|
||||
if err := router.Handle("/api/v1/stats", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.statsHandler.Get),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetStats",
|
||||
Tags: []string{"stats"},
|
||||
Summary: "Get stats",
|
||||
Description: "This endpoint returns the collected stats for the organization",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: map[string]any{},
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -35,11 +35,7 @@ func (handler *handler) GetFeatures(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
|
||||
|
||||
|
||||
74
pkg/querier/collect.go
Normal file
74
pkg/querier/collect.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
func (q *querier) Collect(ctx context.Context, _ valuer.UUID) (map[string]any, error) {
|
||||
stats := make(map[string]any)
|
||||
|
||||
tracesTable := fmt.Sprintf("%s.%s", telemetrytraces.DBName, telemetrytraces.SpanIndexV3TableName)
|
||||
logsTable := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
metricsTable := fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.SamplesV4TableName)
|
||||
|
||||
var traces uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", tracesTable)).Scan(&traces); err == nil {
|
||||
stats["telemetry.traces.count"] = traces
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect traces count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var logs uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", logsTable)).Scan(&logs); err == nil {
|
||||
stats["telemetry.logs.count"] = logs
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect logs count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var metrics uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", metricsTable)).Scan(&metrics); err == nil {
|
||||
stats["telemetry.metrics.count"] = metrics
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect metrics count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var tracesLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT max(timestamp) FROM %s", tracesTable)).Scan(&tracesLastSeenAt); err == nil {
|
||||
if tracesLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
|
||||
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect traces last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
var logsLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM %s", logsTable)).Scan(&logsLastSeenAt); err == nil {
|
||||
if logsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
|
||||
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect logs last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
var metricsLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT toDateTime(max(unix_milli) / 1000) FROM %s", metricsTable)).Scan(&metricsLastSeenAt); err == nil {
|
||||
if metricsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
|
||||
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect metrics last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
@@ -62,7 +62,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
numericColsCount := 0
|
||||
for i, ct := range colTypes {
|
||||
slots[i] = reflect.New(ct.ScanType()).Interface()
|
||||
if isNumericKind(ct.ScanType()) {
|
||||
if numericKind(ct.ScanType().Kind()) {
|
||||
numericColsCount++
|
||||
}
|
||||
}
|
||||
@@ -270,14 +270,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isNumericKind(t reflect.Type) bool {
|
||||
if t == nil {
|
||||
return false
|
||||
}
|
||||
for t.Kind() == reflect.Ptr || t.Kind() == reflect.UnsafePointer {
|
||||
t = t.Elem()
|
||||
}
|
||||
switch t.Kind() {
|
||||
func numericKind(k reflect.Kind) bool {
|
||||
switch k {
|
||||
case reflect.Float32, reflect.Float64,
|
||||
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
|
||||
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||
@@ -296,13 +290,7 @@ func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, erro
|
||||
var aggIndex int64
|
||||
for i, name := range colNames {
|
||||
colType := qbtypes.ColumnTypeGroup
|
||||
// Builder queries aliases aggregation columns as __result_N (always numeric) and wraps group-by keys with toString (always string);
|
||||
// Raw ClickHouse queries may use any aliases.
|
||||
// Handling Builder queries, If name like __result_N -> aggregation, otherwise group-by column
|
||||
// Handling Raw ClickHouse queries, If type is numeric -> aggregation, otherwise group-by column
|
||||
// NOTE: For clickhouse queries, its wrong to assume that numeric columns are always aggregations, user might be grouping by on integer status_code.
|
||||
// However, we are fine with this for now. If need arises, simplest way would be to solve this on the frontend side by asking user a mapping of column names to column types.
|
||||
if aggRe.MatchString(name) || isNumericKind(colTypes[i].ScanType()) {
|
||||
if aggRe.MatchString(name) {
|
||||
colType = qbtypes.ColumnTypeAggregation
|
||||
}
|
||||
cd[i] = &qbtypes.ColumnDescriptor{
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
type Querier interface {
|
||||
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
|
||||
QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream)
|
||||
statsreporter.StatsCollector
|
||||
}
|
||||
|
||||
// BucketCache is the interface for bucket-based caching.
|
||||
|
||||
@@ -49,6 +49,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
@@ -80,6 +81,7 @@ type Handlers struct {
|
||||
TraceDetail tracedetail.Handler
|
||||
RulerHandler ruler.Handler
|
||||
LLMPricingRuleHandler llmpricingrule.Handler
|
||||
StatsHandler statsreporter.Handler
|
||||
}
|
||||
|
||||
func NewHandlers(
|
||||
@@ -97,6 +99,7 @@ func NewHandlers(
|
||||
registryHandler factory.Handler,
|
||||
alertmanagerService alertmanager.Alertmanager,
|
||||
rulerService ruler.Ruler,
|
||||
statsAggregator statsreporter.Aggregator,
|
||||
) Handlers {
|
||||
return Handlers{
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
@@ -125,5 +128,6 @@ func NewHandlers(
|
||||
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
|
||||
RulerHandler: signozruler.NewHandler(rulerService),
|
||||
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule),
|
||||
StatsHandler: statsreporter.NewHandler(statsAggregator),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ func TestNewHandlers(t *testing.T) {
|
||||
|
||||
querierHandler := querier.NewHandler(providerSettings, nil, nil)
|
||||
registryHandler := factory.NewHandler(nil)
|
||||
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil)
|
||||
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil, nil)
|
||||
reflectVal := reflect.ValueOf(handlers)
|
||||
for i := 0; i < reflectVal.NumField(); i++ {
|
||||
f := reflectVal.Field(i)
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
"github.com/swaggest/jsonschema-go"
|
||||
@@ -83,6 +84,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ llmpricingrule.Handler }{},
|
||||
struct{ tracedetail.Handler }{},
|
||||
struct{ ruler.Handler }{},
|
||||
struct{ statsreporter.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -264,9 +264,9 @@ func NewSharderProviderFactories() factory.NamedMap[factory.ProviderFactory[shar
|
||||
)
|
||||
}
|
||||
|
||||
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
|
||||
func NewStatsReporterProviderFactories(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig),
|
||||
analyticsstatsreporter.NewFactory(aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig),
|
||||
noopstatsreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
@@ -309,6 +309,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.LLMPricingRuleHandler,
|
||||
handlers.TraceDetail,
|
||||
handlers.RulerHandler,
|
||||
handlers.StatsHandler,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -85,8 +85,8 @@ func TestNewProviderFactories(t *testing.T) {
|
||||
|
||||
userGetter := impluser.NewGetter(impluser.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), instrumentationtest.New().ToProviderSettings()), userRoleStore, flagger)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual)
|
||||
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
|
||||
statsAggregator := statsreporter.NewAggregator(providerSettings, []statsreporter.StatsCollector{})
|
||||
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
|
||||
@@ -499,14 +499,18 @@ func New(
|
||||
serviceAccount,
|
||||
cloudIntegrationModule,
|
||||
modules.LogsPipeline,
|
||||
querier,
|
||||
}
|
||||
|
||||
// Initialize the stats aggregator (always-on, independent of whether reporting is enabled)
|
||||
statsAggregator := statsreporter.NewAggregator(providerSettings, statsCollectors)
|
||||
|
||||
// Initialize stats reporter from the available stats reporter provider factories
|
||||
statsReporter, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.StatsReporter,
|
||||
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
|
||||
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
|
||||
config.StatsReporter.Provider(),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -535,7 +539,7 @@ func New(
|
||||
|
||||
// Initialize all handlers for the modules
|
||||
registryHandler := factory.NewHandler(registry)
|
||||
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance)
|
||||
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance, statsAggregator)
|
||||
|
||||
// Initialize the API server (after registry so it can access service health)
|
||||
apiserverInstance, err := factory.NewProviderFromNamedMap(
|
||||
|
||||
67
pkg/statsreporter/aggregator.go
Normal file
67
pkg/statsreporter/aggregator.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package statsreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Aggregator aggregates stats from every registered StatsCollector for a single organization.
|
||||
type Aggregator interface {
|
||||
Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error)
|
||||
}
|
||||
|
||||
type aggregator struct {
|
||||
// settings
|
||||
settings factory.ScopedProviderSettings
|
||||
|
||||
// a list of collectors, used to collect stats from across the codebase
|
||||
collectors []StatsCollector
|
||||
}
|
||||
|
||||
func NewAggregator(providerSettings factory.ProviderSettings, collectors []StatsCollector) Aggregator {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/statsreporter")
|
||||
|
||||
return &aggregator{
|
||||
settings: settings,
|
||||
collectors: collectors,
|
||||
}
|
||||
}
|
||||
|
||||
func (aggregator *aggregator) Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "statsreporter",
|
||||
instrumentationtypes.CodeFunctionName: "Aggregate",
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(aggregator.collectors))
|
||||
|
||||
stats := make(map[string]any, 0)
|
||||
mtx := sync.Mutex{}
|
||||
|
||||
for _, collector := range aggregator.collectors {
|
||||
go func(collector StatsCollector) {
|
||||
defer wg.Done()
|
||||
|
||||
collectorStats, err := collector.Collect(ctx, orgID)
|
||||
if err != nil {
|
||||
aggregator.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
|
||||
return
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
for k, v := range collectorStats {
|
||||
stats[k] = v
|
||||
}
|
||||
mtx.Unlock()
|
||||
}(collector)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package analyticsstatsreporter
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@@ -16,11 +15,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/tokenizer"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
)
|
||||
@@ -32,11 +28,8 @@ type provider struct {
|
||||
// config
|
||||
config statsreporter.Config
|
||||
|
||||
// used to get telemetry details. srikanthcvv to move this to the querier layer
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
|
||||
// a list of collectors, used to collect stats from across the codebase
|
||||
collectors []statsreporter.StatsCollector
|
||||
// used to aggregate stats for an organization
|
||||
aggregator statsreporter.Aggregator
|
||||
|
||||
// used to get organizations
|
||||
orgGetter organization.Getter
|
||||
@@ -60,9 +53,9 @@ type provider struct {
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
|
||||
func NewFactory(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("analytics"), func(ctx context.Context, settings factory.ProviderSettings, config statsreporter.Config) (statsreporter.StatsReporter, error) {
|
||||
return New(ctx, settings, config, telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig)
|
||||
return New(ctx, settings, config, aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -70,8 +63,7 @@ func New(
|
||||
ctx context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config statsreporter.Config,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
collectors []statsreporter.StatsCollector,
|
||||
aggregator statsreporter.Aggregator,
|
||||
orgGetter organization.Getter,
|
||||
userGetter user.Getter,
|
||||
tokenizer tokenizer.Tokenizer,
|
||||
@@ -86,17 +78,16 @@ func New(
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
telemetryStore: telemetryStore,
|
||||
collectors: collectors,
|
||||
orgGetter: orgGetter,
|
||||
userGetter: userGetter,
|
||||
analytics: analytics,
|
||||
tokenizer: tokenizer,
|
||||
build: build,
|
||||
deployment: deployment,
|
||||
stopC: make(chan struct{}),
|
||||
settings: settings,
|
||||
config: config,
|
||||
aggregator: aggregator,
|
||||
orgGetter: orgGetter,
|
||||
userGetter: userGetter,
|
||||
analytics: analytics,
|
||||
tokenizer: tokenizer,
|
||||
build: build,
|
||||
deployment: deployment,
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -134,7 +125,12 @@ func (provider *provider) Report(ctx context.Context) error {
|
||||
}
|
||||
|
||||
for _, org := range orgs {
|
||||
stats := provider.collectOrg(ctx, org.ID)
|
||||
stats, err := provider.aggregator.Aggregate(ctx, org.ID)
|
||||
if err != nil {
|
||||
provider.settings.Logger().WarnContext(ctx, "failed to aggregate stats", errors.Attr(err), slog.Any("org_id", org.ID))
|
||||
continue
|
||||
}
|
||||
|
||||
if len(stats) == 0 {
|
||||
provider.settings.Logger().WarnContext(ctx, "no stats collected", slog.Any("org_id", org.ID))
|
||||
continue
|
||||
@@ -204,75 +200,3 @@ func (provider *provider) Stop(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) collectOrg(ctx context.Context, orgID valuer.UUID) map[string]any {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "statsreporter",
|
||||
instrumentationtypes.CodeFunctionName: "collectOrg",
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(provider.collectors))
|
||||
|
||||
stats := make(map[string]any, 0)
|
||||
mtx := sync.Mutex{}
|
||||
|
||||
for _, collector := range provider.collectors {
|
||||
go func(collector statsreporter.StatsCollector) {
|
||||
defer wg.Done()
|
||||
|
||||
collectorStats, err := collector.Collect(ctx, orgID)
|
||||
if err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
|
||||
return
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
for k, v := range collectorStats {
|
||||
stats[k] = v
|
||||
}
|
||||
mtx.Unlock()
|
||||
}(collector)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var traces uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_traces.distributed_signoz_index_v3").Scan(&traces); err == nil {
|
||||
stats["telemetry.traces.count"] = traces
|
||||
}
|
||||
|
||||
var logs uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_logs.distributed_logs_v2").Scan(&logs); err == nil {
|
||||
stats["telemetry.logs.count"] = logs
|
||||
}
|
||||
|
||||
var metrics uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_metrics.distributed_samples_v4").Scan(&metrics); err == nil {
|
||||
stats["telemetry.metrics.count"] = metrics
|
||||
}
|
||||
|
||||
var tracesLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT max(timestamp) FROM signoz_traces.distributed_signoz_index_v3").Scan(&tracesLastSeenAt); err == nil {
|
||||
if tracesLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
|
||||
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
var logsLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM signoz_logs.distributed_logs_v2").Scan(&logsLastSeenAt); err == nil {
|
||||
if logsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
|
||||
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
var metricsLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT toDateTime(max(unix_milli) / 1000) FROM signoz_metrics.distributed_samples_v4").Scan(&metricsLastSeenAt); err == nil {
|
||||
if metricsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
|
||||
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
46
pkg/statsreporter/handler.go
Normal file
46
pkg/statsreporter/handler.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package statsreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
Get(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
aggregator Aggregator
|
||||
}
|
||||
|
||||
func NewHandler(aggregator Aggregator) Handler {
|
||||
return &handler{
|
||||
aggregator: aggregator,
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
stats, err := handler.aggregator.Aggregate(ctx, orgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, stats)
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
"""
|
||||
Integration tests for raw ClickHouse SQL queries in the querier.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
from fixtures import querier, types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
|
||||
|
||||
def test_clickhouse_scalar_numeric_result_alias_classified_as_aggregation(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""A numeric column aliased ``__result_0`` is classified as an aggregation."""
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = querier.make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
int((now - timedelta(hours=1)).timestamp() * 1000),
|
||||
int(now.timestamp() * 1000),
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT toFloat64(1.5) AS `__result_0`",
|
||||
"disabled": False,
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type=querier.RequestType.SCALAR,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
columns = querier.get_scalar_columns(response.json())
|
||||
assert len(columns) == 1
|
||||
assert columns[0]["name"] == "__result_0"
|
||||
assert columns[0]["columnType"] == "aggregation"
|
||||
assert columns[0]["aggregationIndex"] == 0
|
||||
|
||||
response = querier.make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
int((now - timedelta(hours=1)).timestamp() * 1000),
|
||||
int(now.timestamp() * 1000),
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT toNullable(toFloat64(1.5)) AS value",
|
||||
"disabled": False,
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type=querier.RequestType.SCALAR,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
columns = querier.get_scalar_columns(response.json())
|
||||
assert len(columns) == 1
|
||||
assert columns[0]["name"] == "value"
|
||||
assert columns[0]["columnType"] == "aggregation"
|
||||
assert columns[0]["aggregationIndex"] == 0
|
||||
Reference in New Issue
Block a user