Compare commits

..

7 Commits

Author SHA1 Message Date
grandwizard28
aa454f6b2a docs(contributing): note MustNewUUID for IDs from claims 2026-06-12 20:00:57 +05:30
grandwizard28
e38ed9649f fix(flagger): use MustNewUUID for org from claims
Claims come from validated auth context, so the org UUID is guaranteed
valid; drop the dead NewUUID error branch.
2026-06-12 19:53:51 +05:30
grandwizard28
386358214c fix(statsreporter): use MustNewUUID for org from claims
Claims come from validated auth context, so the org UUID is guaranteed
valid; drop the dead NewUUID error branch.
2026-06-12 19:51:50 +05:30
grandwizard28
38b28ba4c5 chore: remove comment from querier Collect 2026-06-12 16:28:06 +05:30
grandwizard28
9237e25ef8 chore: regenerate openapi spec and frontend client
Backend docs/api/openapi.yml gains the GET /api/v1/stats (GetStats)
operation; the Orval client gains a useGetStats hook and GetStats200
type.
2026-06-12 16:16:47 +05:30
grandwizard28
867d626b27 refactor(statsreporter): collect telemetry stats via the querier
Move the trace/log/metric row-count and last-observed queries out of the
stats aggregator and into the querier, which now implements
statsreporter.StatsCollector. The aggregator becomes a pure collector
fan-out and no longer depends on telemetrystore; the querier is wired in
as one of the stats collectors.
2026-06-12 16:03:48 +05:30
grandwizard28
0769ac0e7d feat(statsreporter): expose collected stats via GET /api/v1/stats
Extract per-org stats collection out of the analytics reporter into an
always-on Aggregator (collector fan-out + telemetry-store counts) shared
by the reporter and a new HTTP handler. The GET /api/v1/stats endpoint
returns the caller's org stats regardless of whether scheduled reporting
is enabled.
2026-06-12 15:36:59 +05:30
28 changed files with 481 additions and 303 deletions

View File

@@ -12790,6 +12790,53 @@ paths:
summary: Update a span mapper
tags:
- spanmapper
/api/v1/stats:
get:
deprecated: false
description: This endpoint returns the collected stats for the organization
operationId: GetStats
responses:
"200":
content:
application/json:
schema:
properties:
data:
additionalProperties: {}
type: object
status:
type: string
required:
- status
- data
type: object
description: OK
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get stats
tags:
- stats
/api/v1/testChannel:
post:
deprecated: true

View File

@@ -109,6 +109,20 @@ func (h *handler) CreateThing(rw http.ResponseWriter, req *http.Request) {
}
```
When you need an ID from `claims` as a `valuer.UUID` (for example to pass it to a module), derive it with the `Must*` constructor instead of `NewUUID` plus an error check. Claims are validated by the auth middleware, so the conversion cannot fail and the error branch would be dead code:
```go
// Good — claims are pre-validated, the conversion cannot fail.
orgID := valuer.MustNewUUID(claims.OrgID)
// Avoid — the error path is unreachable.
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
```
### 3. Register the handler in `signozapiserver`
In `pkg/apiserver/signozapiserver`, add a route in the appropriate `add*Routes` function (`addUserRoutes`, `addSessionRoutes`, `addOrgRoutes`, etc.). The pattern is:
@@ -387,3 +401,4 @@ Note the discriminator property lives in the variants, not on the parent — the
- **Add `nullable:"true"`** on fields that can be `null`. Pay special attention to slices and maps -- in Go these default to `nil` which serializes to `null`. If the field should always be an array, initialize it and do not mark it nullable.
- **Implement `Enum()`** on every type that has a fixed set of acceptable values so the JSON schema generates proper `enum` constraints.
- **Add request examples** via `RequestExamples` in `OpenAPIDef` for any non-trivial endpoint. See `pkg/apiserver/signozapiserver/querier.go` for reference.
- **Derive IDs from `claims` with `valuer.MustNewUUID`** (e.g. `claims.OrgID`, `claims.UserID`). Claims are pre-validated by the auth middleware, so use the `Must*` constructor — don't write `NewUUID` followed by an `if err != nil { render.Error(...); return }` block.

View File

@@ -9746,6 +9746,19 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type GetStats200Data = { [key: string]: unknown };
export type GetStats200 = {
/**
* @type object
*/
data: GetStats200Data;
/**
* @type string
*/
status: string;
};
export type GetTraceAggregationsPathParameters = {
traceID: string;
};

View File

@@ -0,0 +1,96 @@
/**
* ! Do not edit manually
* * The file has been auto-generated using Orval for SigNoz
* * regenerate with 'pnpm generate:api'
* SigNoz
*/
import { useQuery } from 'react-query';
import type {
InvalidateOptions,
QueryClient,
QueryFunction,
QueryKey,
UseQueryOptions,
UseQueryResult,
} from 'react-query';
import type { GetStats200, RenderErrorResponseDTO } from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType } from '../../../generatedAPIInstance';
/**
* This endpoint returns the collected stats for the organization
* @summary Get stats
*/
export const getStats = (signal?: AbortSignal) => {
return GeneratedAPIInstance<GetStats200>({
url: `/api/v1/stats`,
method: 'GET',
signal,
});
};
export const getGetStatsQueryKey = () => {
return [`/api/v1/stats`] as const;
};
export const getGetStatsQueryOptions = <
TData = Awaited<ReturnType<typeof getStats>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(options?: {
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
}) => {
const { query: queryOptions } = options ?? {};
const queryKey = queryOptions?.queryKey ?? getGetStatsQueryKey();
const queryFn: QueryFunction<Awaited<ReturnType<typeof getStats>>> = ({
signal,
}) => getStats(signal);
return { queryKey, queryFn, ...queryOptions } as UseQueryOptions<
Awaited<ReturnType<typeof getStats>>,
TError,
TData
> & { queryKey: QueryKey };
};
export type GetStatsQueryResult = NonNullable<
Awaited<ReturnType<typeof getStats>>
>;
export type GetStatsQueryError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get stats
*/
export function useGetStats<
TData = Awaited<ReturnType<typeof getStats>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(options?: {
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
}): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
const queryOptions = getGetStatsQueryOptions(options);
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
queryKey: QueryKey;
};
return { ...query, queryKey: queryOptions.queryKey };
}
/**
* @summary Get stats
*/
export const invalidateGetStats = async (
queryClient: QueryClient,
options?: InvalidateOptions,
): Promise<QueryClient> => {
await queryClient.invalidateQueries(
{ queryKey: getGetStatsQueryKey() },
options,
);
return queryClient;
};

View File

@@ -796,7 +796,7 @@ export const getClusterMetricsQueryPayload = (
key: k8sDeploymentDesiredKey,
type: 'Gauge',
},
aggregateOperator: 'latest',
aggregateOperator: 'avg',
dataSource: DataSource.METRICS,
disabled: false,
expression: 'B',
@@ -839,7 +839,7 @@ export const getClusterMetricsQueryPayload = (
reduceTo: ReduceOperators.LAST,
spaceAggregation: 'sum',
stepInterval: 60,
timeAggregation: 'latest',
timeAggregation: 'avg',
},
],
queryFormulas: [],

View File

@@ -40,7 +40,6 @@ import { K8S_ENTITY_EVENTS_EXPRESSION_KEY, useEntityEvents } from './hooks';
import { getEntityEventsQueryPayload, isEventsKeyNotFoundError } from './utils';
import styles from './EntityEvents.module.scss';
import { useTimezone } from 'providers/Timezone';
interface EventDataType {
key: string;
@@ -168,25 +167,17 @@ function EntityEventsContent({
[events],
);
const { formatTimezoneAdjustedTimestamp } = useTimezone();
const columns: TableColumnsType<EventDataType> = useMemo(
() => [
{ title: 'Severity', dataIndex: 'severity', key: 'severity', width: 100 },
{
title: 'Timestamp',
dataIndex: 'timestamp',
width: 240,
ellipsis: true,
key: 'timestamp',
render: (value: string | number): string =>
formatTimezoneAdjustedTimestamp(
typeof value === 'string' ? value : value / 1e6,
),
},
{ title: 'Body', dataIndex: 'body', key: 'body' },
],
[formatTimezoneAdjustedTimestamp],
);
const columns: TableColumnsType<EventDataType> = [
{ title: 'Severity', dataIndex: 'severity', key: 'severity', width: 100 },
{
title: 'Timestamp',
dataIndex: 'timestamp',
width: 240,
ellipsis: true,
key: 'timestamp',
},
{ title: 'Body', dataIndex: 'body', key: 'body' },
];
const handleExpandRowIcon = ({
expanded,

View File

@@ -41,7 +41,6 @@ import { getTraceListColumns } from './traceListColumns';
import { getEntityTracesQueryPayload } from './utils';
import styles from './EntityTraces.module.scss';
import { useTimezone } from 'providers/Timezone';
interface Props {
timeRange: {
@@ -137,11 +136,7 @@ function EntityTracesContent({
[timeRange.startTime, timeRange.endTime, userExpression],
);
const { formatTimezoneAdjustedTimestamp } = useTimezone();
const traceListColumns = getTraceListColumns(
selectedEntityTracesColumns,
formatTimezoneAdjustedTimestamp,
);
const traceListColumns = getTraceListColumns(selectedEntityTracesColumns);
const isKeyNotFound = isKeyNotFoundError(error);
const isDataEmpty =

View File

@@ -1,14 +1,15 @@
import { TableColumnsType as ColumnsType } from 'antd';
import { Badge } from '@signozhq/ui/badge';
import { Typography } from '@signozhq/ui/typography';
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
import { getMs } from 'container/Trace/Filters/Panel/PanelBody/Duration/util';
import {
BlockLink,
getTraceLink,
} from 'container/TracesExplorer/ListView/utils';
import dayjs from 'dayjs';
import { RowData } from 'lib/query/createTableColumnsFromQuery';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import { FormatTimezoneAdjustedTimestamp } from 'hooks/useTimezoneFormatter/useTimezoneFormatter';
const keyToLabelMap: Record<string, string> = {
timestamp: 'Timestamp',
@@ -58,7 +59,6 @@ const getValueForKey = (data: Record<string, any>, key: string): any => {
export const getTraceListColumns = (
selectedColumns: BaseAutocompleteData[],
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp,
): ColumnsType<RowData> => {
const columns: ColumnsType<RowData> =
selectedColumns.map(({ dataType, key, type }) => ({
@@ -73,8 +73,8 @@ export const getTraceListColumns = (
if (primaryKey === 'timestamp') {
const date =
typeof value === 'string'
? formatTimezoneAdjustedTimestamp(value)
: formatTimezoneAdjustedTimestamp(value / 1e6);
? dayjs(value).format(DATE_TIME_FORMATS.ISO_DATETIME_MS)
: dayjs(value / 1e6).format(DATE_TIME_FORMATS.ISO_DATETIME_MS);
return (
<BlockLink to={getTraceLink(itemData)} openInNewTab>

View File

@@ -1366,7 +1366,7 @@ export const getPodMetricsQueryPayload = (
orderBy: [],
queryName: 'B',
reduceTo: ReduceOperators.AVG,
spaceAggregation: 'sum',
spaceAggregation: 'avg',
stepInterval: 60,
timeAggregation: 'avg',
},

View File

@@ -86,9 +86,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
},
{
id: 'capacity',
header: 'Capacity',
header: 'Volume Capacity',
accessorFn: (row): number => row.volumeCapacity,
width: { min: 140 },
width: { min: 220 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const capacity = value as number;
@@ -105,9 +105,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
},
{
id: 'usage',
header: 'Used',
header: 'Volume Utilization',
accessorFn: (row): number => row.volumeUsage,
width: { min: 140 },
width: { min: 220 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const usage = value as number;
@@ -124,9 +124,9 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
},
{
id: 'available',
header: 'Available',
header: 'Volume Available',
accessorFn: (row): number => row.volumeAvailable,
width: { min: 140 },
width: { min: 220 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const available = value as number;
@@ -141,61 +141,4 @@ export const k8sVolumesColumnsConfig: TableColumnDef<K8sVolumesData>[] = [
);
},
},
{
id: 'inodes',
header: 'Inodes',
accessorFn: (row): number => row.volumeInodes,
width: { min: 140 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const inodes = value as number;
return (
<ValidateColumnValueWrapper
value={inodes}
entity={InfraMonitoringEntity.VOLUMES}
attribute="inodes metric"
>
<TanStackTable.Text>{inodes}</TanStackTable.Text>
</ValidateColumnValueWrapper>
);
},
},
{
id: 'inodesUsed',
header: 'Inodes Used',
accessorFn: (row): number => row.volumeInodesUsed,
width: { min: 160 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const inodesUsed = value as number;
return (
<ValidateColumnValueWrapper
value={inodesUsed}
entity={InfraMonitoringEntity.VOLUMES}
attribute="inodes used metric"
>
<TanStackTable.Text>{inodesUsed}</TanStackTable.Text>
</ValidateColumnValueWrapper>
);
},
},
{
id: 'inodesFree',
header: 'Inodes Free',
accessorFn: (row): number => row.volumeInodesFree,
width: { min: 160 },
enableSort: true,
cell: ({ value }): React.ReactNode => {
const inodesFree = value as number;
return (
<ValidateColumnValueWrapper
value={inodesFree}
entity={InfraMonitoringEntity.VOLUMES}
attribute="inodes free metric"
>
<TanStackTable.Text>{inodesFree}</TanStackTable.Text>
</ValidateColumnValueWrapper>
);
},
},
];

View File

@@ -22,13 +22,11 @@ interface CacheEntry {
const CACHE_SIZE_LIMIT = 1000;
const CACHE_CLEANUP_PERCENTAGE = 0.5; // Remove 50% when limit is reached
export type FormatTimezoneAdjustedTimestamp = (
input: TimestampInput,
format?: string,
) => string;
function useTimezoneFormatter({ userTimezone }: { userTimezone: Timezone }): {
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp;
formatTimezoneAdjustedTimestamp: (
input: TimestampInput,
format?: string,
) => string;
} {
// Initialize cache using useMemo to persist between renders
const cache = useMemo(() => new Map<string, CacheEntry>(), []);

View File

@@ -19,14 +19,17 @@ import {
} from 'components/CustomTimePicker/timezoneUtils';
import { LOCALSTORAGE } from 'constants/localStorage';
import useTimezoneFormatter, {
FormatTimezoneAdjustedTimestamp,
TimestampInput,
} from 'hooks/useTimezoneFormatter/useTimezoneFormatter';
export interface TimezoneContextType {
timezone: Timezone;
browserTimezone: Timezone;
updateTimezone: (timezone: Timezone) => void;
formatTimezoneAdjustedTimestamp: FormatTimezoneAdjustedTimestamp;
formatTimezoneAdjustedTimestamp: (
input: TimestampInput,
format?: string,
) => string;
isAdaptationEnabled: boolean;
setIsAdaptationEnabled: Dispatch<SetStateAction<boolean>>;
}

View File

@@ -31,6 +31,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/zeus"
@@ -70,6 +71,7 @@ type provider struct {
traceDetailHandler tracedetail.Handler
rulerHandler ruler.Handler
llmPricingRuleHandler llmpricingrule.Handler
statsHandler statsreporter.Handler
}
func NewFactory(
@@ -102,6 +104,7 @@ func NewFactory(
llmPricingRuleHandler llmpricingrule.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
statsHandler statsreporter.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -137,6 +140,7 @@ func NewFactory(
llmPricingRuleHandler,
traceDetailHandler,
rulerHandler,
statsHandler,
)
})
}
@@ -174,6 +178,7 @@ func newProvider(
llmPricingRuleHandler llmpricingrule.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
statsHandler statsreporter.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -210,6 +215,7 @@ func newProvider(
traceDetailHandler: traceDetailHandler,
rulerHandler: rulerHandler,
llmPricingRuleHandler: llmPricingRuleHandler,
statsHandler: statsHandler,
}
provider.authzMiddleware = middleware.NewAuthZ(settings.Logger(), orgGetter, authzService)
@@ -334,6 +340,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addStatsReporterRoutes(router); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,33 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/gorilla/mux"
)
func (provider *provider) addStatsReporterRoutes(router *mux.Router) error {
if err := router.Handle("/api/v1/stats", handler.New(
provider.authzMiddleware.ViewAccess(provider.statsHandler.Get),
handler.OpenAPIDef{
ID: "GetStats",
Tags: []string{"stats"},
Summary: "Get stats",
Description: "This endpoint returns the collected stats for the organization",
Request: nil,
RequestContentType: "",
Response: map[string]any{},
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -35,11 +35,7 @@ func (handler *handler) GetFeatures(rw http.ResponseWriter, r *http.Request) {
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)

74
pkg/querier/collect.go Normal file
View File

@@ -0,0 +1,74 @@
package querier
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/valuer"
)
func (q *querier) Collect(ctx context.Context, _ valuer.UUID) (map[string]any, error) {
stats := make(map[string]any)
tracesTable := fmt.Sprintf("%s.%s", telemetrytraces.DBName, telemetrytraces.SpanIndexV3TableName)
logsTable := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
metricsTable := fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.SamplesV4TableName)
var traces uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", tracesTable)).Scan(&traces); err == nil {
stats["telemetry.traces.count"] = traces
} else {
q.logger.DebugContext(ctx, "failed to collect traces count", errors.Attr(err))
}
var logs uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", logsTable)).Scan(&logs); err == nil {
stats["telemetry.logs.count"] = logs
} else {
q.logger.DebugContext(ctx, "failed to collect logs count", errors.Attr(err))
}
var metrics uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", metricsTable)).Scan(&metrics); err == nil {
stats["telemetry.metrics.count"] = metrics
} else {
q.logger.DebugContext(ctx, "failed to collect metrics count", errors.Attr(err))
}
var tracesLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT max(timestamp) FROM %s", tracesTable)).Scan(&tracesLastSeenAt); err == nil {
if tracesLastSeenAt.Unix() != 0 {
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect traces last observed", errors.Attr(err))
}
var logsLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM %s", logsTable)).Scan(&logsLastSeenAt); err == nil {
if logsLastSeenAt.Unix() != 0 {
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect logs last observed", errors.Attr(err))
}
var metricsLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT toDateTime(max(unix_milli) / 1000) FROM %s", metricsTable)).Scan(&metricsLastSeenAt); err == nil {
if metricsLastSeenAt.Unix() != 0 {
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect metrics last observed", errors.Attr(err))
}
return stats, nil
}

View File

@@ -62,7 +62,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
numericColsCount := 0
for i, ct := range colTypes {
slots[i] = reflect.New(ct.ScanType()).Interface()
if isNumericKind(ct.ScanType()) {
if numericKind(ct.ScanType().Kind()) {
numericColsCount++
}
}
@@ -270,14 +270,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}, nil
}
func isNumericKind(t reflect.Type) bool {
if t == nil {
return false
}
for t.Kind() == reflect.Ptr || t.Kind() == reflect.UnsafePointer {
t = t.Elem()
}
switch t.Kind() {
func numericKind(k reflect.Kind) bool {
switch k {
case reflect.Float32, reflect.Float64,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
@@ -296,13 +290,7 @@ func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, erro
var aggIndex int64
for i, name := range colNames {
colType := qbtypes.ColumnTypeGroup
// Builder queries aliases aggregation columns as __result_N (always numeric) and wraps group-by keys with toString (always string);
// Raw ClickHouse queries may use any aliases.
// Handling Builder queries, If name like __result_N -> aggregation, otherwise group-by column
// Handling Raw ClickHouse queries, If type is numeric -> aggregation, otherwise group-by column
// NOTE: For clickhouse queries, its wrong to assume that numeric columns are always aggregations, user might be grouping by on integer status_code.
// However, we are fine with this for now. If need arises, simplest way would be to solve this on the frontend side by asking user a mapping of column names to column types.
if aggRe.MatchString(name) || isNumericKind(colTypes[i].ScanType()) {
if aggRe.MatchString(name) {
colType = qbtypes.ColumnTypeAggregation
}
cd[i] = &qbtypes.ColumnDescriptor{

View File

@@ -4,6 +4,7 @@ import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/statsreporter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -12,6 +13,7 @@ import (
type Querier interface {
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream)
statsreporter.StatsCollector
}
// BucketCache is the interface for bucket-based caching.

View File

@@ -49,6 +49,7 @@ import (
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/zeus"
)
@@ -80,6 +81,7 @@ type Handlers struct {
TraceDetail tracedetail.Handler
RulerHandler ruler.Handler
LLMPricingRuleHandler llmpricingrule.Handler
StatsHandler statsreporter.Handler
}
func NewHandlers(
@@ -97,6 +99,7 @@ func NewHandlers(
registryHandler factory.Handler,
alertmanagerService alertmanager.Alertmanager,
rulerService ruler.Ruler,
statsAggregator statsreporter.Aggregator,
) Handlers {
return Handlers{
SavedView: implsavedview.NewHandler(modules.SavedView),
@@ -125,5 +128,6 @@ func NewHandlers(
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
RulerHandler: signozruler.NewHandler(rulerService),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule),
StatsHandler: statsreporter.NewHandler(statsAggregator),
}
}

View File

@@ -63,7 +63,7 @@ func TestNewHandlers(t *testing.T) {
querierHandler := querier.NewHandler(providerSettings, nil, nil)
registryHandler := factory.NewHandler(nil)
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil)
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil, nil)
reflectVal := reflect.ValueOf(handlers)
for i := 0; i < reflectVal.NumField(); i++ {
f := reflectVal.Field(i)

View File

@@ -36,6 +36,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/swaggest/jsonschema-go"
@@ -83,6 +84,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ llmpricingrule.Handler }{},
struct{ tracedetail.Handler }{},
struct{ ruler.Handler }{},
struct{ statsreporter.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -264,9 +264,9 @@ func NewSharderProviderFactories() factory.NamedMap[factory.ProviderFactory[shar
)
}
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
func NewStatsReporterProviderFactories(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
return factory.MustNewNamedMap(
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig),
analyticsstatsreporter.NewFactory(aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig),
noopstatsreporter.NewFactory(),
)
}
@@ -309,6 +309,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.LLMPricingRuleHandler,
handlers.TraceDetail,
handlers.RulerHandler,
handlers.StatsHandler,
),
)
}

View File

@@ -85,8 +85,8 @@ func TestNewProviderFactories(t *testing.T) {
userGetter := impluser.NewGetter(impluser.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), instrumentationtest.New().ToProviderSettings()), userRoleStore, flagger)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual)
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
statsAggregator := statsreporter.NewAggregator(providerSettings, []statsreporter.StatsCollector{})
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
})
assert.NotPanics(t, func() {

View File

@@ -499,14 +499,18 @@ func New(
serviceAccount,
cloudIntegrationModule,
modules.LogsPipeline,
querier,
}
// Initialize the stats aggregator (always-on, independent of whether reporting is enabled)
statsAggregator := statsreporter.NewAggregator(providerSettings, statsCollectors)
// Initialize stats reporter from the available stats reporter provider factories
statsReporter, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.StatsReporter,
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
config.StatsReporter.Provider(),
)
if err != nil {
@@ -535,7 +539,7 @@ func New(
// Initialize all handlers for the modules
registryHandler := factory.NewHandler(registry)
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance)
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance, statsAggregator)
// Initialize the API server (after registry so it can access service health)
apiserverInstance, err := factory.NewProviderFromNamedMap(

View File

@@ -0,0 +1,67 @@
package statsreporter
import (
"context"
"sync"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Aggregator aggregates stats from every registered StatsCollector for a single organization.
type Aggregator interface {
Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error)
}
type aggregator struct {
// settings
settings factory.ScopedProviderSettings
// a list of collectors, used to collect stats from across the codebase
collectors []StatsCollector
}
func NewAggregator(providerSettings factory.ProviderSettings, collectors []StatsCollector) Aggregator {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/statsreporter")
return &aggregator{
settings: settings,
collectors: collectors,
}
}
func (aggregator *aggregator) Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "statsreporter",
instrumentationtypes.CodeFunctionName: "Aggregate",
})
var wg sync.WaitGroup
wg.Add(len(aggregator.collectors))
stats := make(map[string]any, 0)
mtx := sync.Mutex{}
for _, collector := range aggregator.collectors {
go func(collector StatsCollector) {
defer wg.Done()
collectorStats, err := collector.Collect(ctx, orgID)
if err != nil {
aggregator.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
return
}
mtx.Lock()
for k, v := range collectorStats {
stats[k] = v
}
mtx.Unlock()
}(collector)
}
wg.Wait()
return stats, nil
}

View File

@@ -3,7 +3,6 @@ package analyticsstatsreporter
import (
"context"
"log/slog"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
@@ -16,11 +15,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/tokenizer"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/version"
)
@@ -32,11 +28,8 @@ type provider struct {
// config
config statsreporter.Config
// used to get telemetry details. srikanthcvv to move this to the querier layer
telemetryStore telemetrystore.TelemetryStore
// a list of collectors, used to collect stats from across the codebase
collectors []statsreporter.StatsCollector
// used to aggregate stats for an organization
aggregator statsreporter.Aggregator
// used to get organizations
orgGetter organization.Getter
@@ -60,9 +53,9 @@ type provider struct {
stopC chan struct{}
}
func NewFactory(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
func NewFactory(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("analytics"), func(ctx context.Context, settings factory.ProviderSettings, config statsreporter.Config) (statsreporter.StatsReporter, error) {
return New(ctx, settings, config, telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig)
return New(ctx, settings, config, aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig)
})
}
@@ -70,8 +63,7 @@ func New(
ctx context.Context,
providerSettings factory.ProviderSettings,
config statsreporter.Config,
telemetryStore telemetrystore.TelemetryStore,
collectors []statsreporter.StatsCollector,
aggregator statsreporter.Aggregator,
orgGetter organization.Getter,
userGetter user.Getter,
tokenizer tokenizer.Tokenizer,
@@ -86,17 +78,16 @@ func New(
}
return &provider{
settings: settings,
config: config,
telemetryStore: telemetryStore,
collectors: collectors,
orgGetter: orgGetter,
userGetter: userGetter,
analytics: analytics,
tokenizer: tokenizer,
build: build,
deployment: deployment,
stopC: make(chan struct{}),
settings: settings,
config: config,
aggregator: aggregator,
orgGetter: orgGetter,
userGetter: userGetter,
analytics: analytics,
tokenizer: tokenizer,
build: build,
deployment: deployment,
stopC: make(chan struct{}),
}, nil
}
@@ -134,7 +125,12 @@ func (provider *provider) Report(ctx context.Context) error {
}
for _, org := range orgs {
stats := provider.collectOrg(ctx, org.ID)
stats, err := provider.aggregator.Aggregate(ctx, org.ID)
if err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to aggregate stats", errors.Attr(err), slog.Any("org_id", org.ID))
continue
}
if len(stats) == 0 {
provider.settings.Logger().WarnContext(ctx, "no stats collected", slog.Any("org_id", org.ID))
continue
@@ -204,75 +200,3 @@ func (provider *provider) Stop(ctx context.Context) error {
return nil
}
func (provider *provider) collectOrg(ctx context.Context, orgID valuer.UUID) map[string]any {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "statsreporter",
instrumentationtypes.CodeFunctionName: "collectOrg",
})
var wg sync.WaitGroup
wg.Add(len(provider.collectors))
stats := make(map[string]any, 0)
mtx := sync.Mutex{}
for _, collector := range provider.collectors {
go func(collector statsreporter.StatsCollector) {
defer wg.Done()
collectorStats, err := collector.Collect(ctx, orgID)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
return
}
mtx.Lock()
for k, v := range collectorStats {
stats[k] = v
}
mtx.Unlock()
}(collector)
}
wg.Wait()
var traces uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_traces.distributed_signoz_index_v3").Scan(&traces); err == nil {
stats["telemetry.traces.count"] = traces
}
var logs uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_logs.distributed_logs_v2").Scan(&logs); err == nil {
stats["telemetry.logs.count"] = logs
}
var metrics uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_metrics.distributed_samples_v4").Scan(&metrics); err == nil {
stats["telemetry.metrics.count"] = metrics
}
var tracesLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT max(timestamp) FROM signoz_traces.distributed_signoz_index_v3").Scan(&tracesLastSeenAt); err == nil {
if tracesLastSeenAt.Unix() != 0 {
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
}
}
var logsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM signoz_logs.distributed_logs_v2").Scan(&logsLastSeenAt); err == nil {
if logsLastSeenAt.Unix() != 0 {
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
}
}
var metricsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT toDateTime(max(unix_milli) / 1000) FROM signoz_metrics.distributed_samples_v4").Scan(&metricsLastSeenAt); err == nil {
if metricsLastSeenAt.Unix() != 0 {
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
}
}
return stats
}

View File

@@ -0,0 +1,46 @@
package statsreporter
import (
"context"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Handler interface {
Get(http.ResponseWriter, *http.Request)
}
type handler struct {
aggregator Aggregator
}
func NewHandler(aggregator Aggregator) Handler {
return &handler{
aggregator: aggregator,
}
}
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
stats, err := handler.aggregator.Aggregate(ctx, orgID)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, stats)
}

View File

@@ -1,74 +0,0 @@
"""
Integration tests for raw ClickHouse SQL queries in the querier.
"""
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from fixtures import querier, types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
def test_clickhouse_scalar_numeric_result_alias_classified_as_aggregation(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""A numeric column aliased ``__result_0`` is classified as an aggregation."""
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = querier.make_query_request(
signoz,
token,
int((now - timedelta(hours=1)).timestamp() * 1000),
int(now.timestamp() * 1000),
[
{
"type": "clickhouse_sql",
"spec": {
"name": "A",
"query": "SELECT toFloat64(1.5) AS `__result_0`",
"disabled": False,
},
}
],
request_type=querier.RequestType.SCALAR,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
columns = querier.get_scalar_columns(response.json())
assert len(columns) == 1
assert columns[0]["name"] == "__result_0"
assert columns[0]["columnType"] == "aggregation"
assert columns[0]["aggregationIndex"] == 0
response = querier.make_query_request(
signoz,
token,
int((now - timedelta(hours=1)).timestamp() * 1000),
int(now.timestamp() * 1000),
[
{
"type": "clickhouse_sql",
"spec": {
"name": "A",
"query": "SELECT toNullable(toFloat64(1.5)) AS value",
"disabled": False,
},
}
],
request_type=querier.RequestType.SCALAR,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
columns = querier.get_scalar_columns(response.json())
assert len(columns) == 1
assert columns[0]["name"] == "value"
assert columns[0]["columnType"] == "aggregation"
assert columns[0]["aggregationIndex"] == 0