Compare commits

...

2 Commits

Author SHA1 Message Date
srikanthccv
e3c04b378a fix: update rate/increase query and address several issues in builder queries 2026-02-14 11:02:09 +05:30
Abhi kumar
3c30114642 feat: added option to copy legend text (#10294)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* feat: added option to copy legend text

* chore: added test for legend copy action

* chore: updated legend styles

* chore: added check icon when legend copied

* chore: added copytoclipboard hook

* chore: removed copytoclipboard options
2026-02-13 08:08:04 +00:00
20 changed files with 621 additions and 857 deletions

View File

@@ -285,7 +285,6 @@ flagger:
config:
boolean:
use_span_metrics: true
interpolation_enabled: false
kafka_span_eval: false
string:
float:

View File

@@ -0,0 +1,61 @@
import { useCallback, useEffect, useRef, useState } from 'react';
const DEFAULT_COPIED_RESET_MS = 2000;
export interface UseCopyToClipboardOptions {
/** How long (ms) to keep "copied" state before resetting. Default 2000. */
copiedResetMs?: number;
}
export type ID = number | string | null;
export interface UseCopyToClipboardReturn {
/** Copy text to clipboard. Pass an optional id to track which item was copied (e.g. seriesIndex). */
copyToClipboard: (text: string, id?: ID) => void;
/** True when something was just copied and still within the reset threshold. */
isCopied: boolean;
/** The id passed to the last successful copy, or null after reset. Use to show "copied" state for a specific item (e.g. copiedId === item.seriesIndex). */
id: ID;
}
export function useCopyToClipboard(
options: UseCopyToClipboardOptions = {},
): UseCopyToClipboardReturn {
const { copiedResetMs = DEFAULT_COPIED_RESET_MS } = options;
const [state, setState] = useState<{ isCopied: boolean; id: ID }>({
isCopied: false,
id: null,
});
const timeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
useEffect(() => {
return (): void => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
timeoutRef.current = null;
}
};
}, []);
const copyToClipboard = useCallback(
(text: string, id?: ID): void => {
navigator.clipboard.writeText(text).then(() => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
setState({ isCopied: true, id: id ?? null });
timeoutRef.current = setTimeout(() => {
setState({ isCopied: false, id: null });
timeoutRef.current = null;
}, copiedResetMs);
});
},
[copiedResetMs],
);
return {
copyToClipboard,
isCopied: state.isCopied,
id: state.id,
};
}

View File

@@ -128,6 +128,15 @@
opacity: 1;
}
.legend-item-label-trigger {
display: flex;
align-items: center;
gap: 6px;
flex: 1;
min-width: 0;
cursor: pointer;
}
.legend-marker {
border-width: 2px;
border-radius: 50%;
@@ -157,10 +166,34 @@
text-overflow: ellipsis;
white-space: nowrap;
min-width: 0;
user-select: none;
}
.legend-copy-button {
display: none;
align-items: center;
justify-content: center;
flex-shrink: 0;
padding: 2px;
margin: 0;
border: none;
color: var(--bg-vanilla-400);
cursor: pointer;
border-radius: 4px;
opacity: 1;
transition: opacity 0.15s ease, color 0.15s ease;
&:hover {
color: var(--bg-vanilla-100);
}
}
&:hover {
background: rgba(255, 255, 255, 0.05);
.legend-copy-button {
display: flex;
opacity: 1;
}
}
}
@@ -172,4 +205,17 @@
}
}
}
.legend-item {
&:hover {
background: rgba(0, 0, 0, 0.05);
}
.legend-copy-button {
color: var(--bg-ink-400);
&:hover {
color: var(--bg-ink-500);
}
}
}
}

View File

@@ -2,8 +2,10 @@ import { useCallback, useMemo, useRef, useState } from 'react';
import { VirtuosoGrid } from 'react-virtuoso';
import { Input, Tooltip as AntdTooltip } from 'antd';
import cx from 'classnames';
import { useCopyToClipboard } from 'hooks/useCopyToClipboard';
import { LegendItem } from 'lib/uPlotV2/config/types';
import useLegendsSync from 'lib/uPlotV2/hooks/useLegendsSync';
import { Check, Copy } from 'lucide-react';
import { useLegendActions } from '../../hooks/useLegendActions';
import { LegendPosition, LegendProps } from '../types';
@@ -32,6 +34,7 @@ export default function Legend({
});
const legendContainerRef = useRef<HTMLDivElement | null>(null);
const [legendSearchQuery, setLegendSearchQuery] = useState('');
const { copyToClipboard, id: copiedId } = useCopyToClipboard();
const legendItems = useMemo(() => Object.values(legendItemsMap), [
legendItemsMap,
@@ -59,26 +62,53 @@ export default function Legend({
);
}, [position, legendSearchQuery, legendItems]);
const handleCopyLegendItem = useCallback(
(e: React.MouseEvent, seriesIndex: number, label: string): void => {
e.stopPropagation();
copyToClipboard(label, seriesIndex);
},
[copyToClipboard],
);
const renderLegendItem = useCallback(
(item: LegendItem): JSX.Element => (
<AntdTooltip key={item.seriesIndex} title={item.label}>
(item: LegendItem): JSX.Element => {
const isCopied = copiedId === item.seriesIndex;
return (
<div
key={item.seriesIndex}
data-legend-item-id={item.seriesIndex}
className={cx('legend-item', `legend-item-${position.toLowerCase()}`, {
'legend-item-off': !item.show,
'legend-item-focused': focusedSeriesIndex === item.seriesIndex,
})}
>
<div
className="legend-marker"
style={{ borderColor: String(item.color) }}
data-is-legend-marker={true}
/>
<span className="legend-label">{item.label}</span>
<AntdTooltip title={item.label}>
<div className="legend-item-label-trigger">
<div
className="legend-marker"
style={{ borderColor: String(item.color) }}
data-is-legend-marker={true}
/>
<span className="legend-label">{item.label}</span>
</div>
</AntdTooltip>
<AntdTooltip title={isCopied ? 'Copied' : 'Copy'}>
<button
type="button"
className="legend-copy-button"
onClick={(e): void =>
handleCopyLegendItem(e, item.seriesIndex, item.label ?? '')
}
aria-label={`Copy ${item.label}`}
data-testid="legend-copy"
>
{isCopied ? <Check size={12} /> : <Copy size={12} />}
</button>
</AntdTooltip>
</div>
</AntdTooltip>
),
[focusedSeriesIndex, position],
);
},
[copiedId, focusedSeriesIndex, handleCopyLegendItem, position],
);
const isEmptyState = useMemo(() => {

View File

@@ -1,5 +1,11 @@
import React from 'react';
import { render, RenderResult, screen } from '@testing-library/react';
import {
fireEvent,
render,
RenderResult,
screen,
within,
} from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { LegendItem } from 'lib/uPlotV2/config/types';
import useLegendsSync from 'lib/uPlotV2/hooks/useLegendsSync';
@@ -8,6 +14,9 @@ import { useLegendActions } from '../../hooks/useLegendActions';
import Legend from '../Legend/Legend';
import { LegendPosition } from '../types';
const mockWriteText = jest.fn().mockResolvedValue(undefined);
let clipboardSpy: jest.SpyInstance | undefined;
jest.mock('react-virtuoso', () => ({
VirtuosoGrid: ({
data,
@@ -39,6 +48,15 @@ const mockUseLegendActions = useLegendActions as jest.MockedFunction<
>;
describe('Legend', () => {
beforeAll(() => {
// JSDOM does not define navigator.clipboard; add it so we can spy on writeText
Object.defineProperty(navigator, 'clipboard', {
value: { writeText: () => Promise.resolve() },
writable: true,
configurable: true,
});
});
const baseLegendItemsMap = {
0: {
seriesIndex: 0,
@@ -70,6 +88,11 @@ describe('Legend', () => {
onLegendMouseMove = jest.fn();
onLegendMouseLeave = jest.fn();
onFocusSeries = jest.fn();
mockWriteText.mockClear();
clipboardSpy = jest
.spyOn(navigator.clipboard, 'writeText')
.mockImplementation(mockWriteText);
mockUseLegendsSync.mockReturnValue({
legendItemsMap: baseLegendItemsMap,
@@ -86,6 +109,7 @@ describe('Legend', () => {
});
afterEach(() => {
clipboardSpy?.mockRestore();
jest.clearAllMocks();
});
@@ -210,4 +234,47 @@ describe('Legend', () => {
expect(onLegendMouseLeave).toHaveBeenCalledTimes(1);
});
});
describe('copy action', () => {
it('copies the legend label to clipboard when copy button is clicked', () => {
renderLegend(LegendPosition.RIGHT);
const firstLegendItem = document.querySelector(
'[data-legend-item-id="0"]',
) as HTMLElement;
const copyButton = within(firstLegendItem).getByTestId('legend-copy');
fireEvent.click(copyButton);
expect(mockWriteText).toHaveBeenCalledTimes(1);
expect(mockWriteText).toHaveBeenCalledWith('A');
});
it('copies the correct label when copy is clicked on a different legend item', () => {
renderLegend(LegendPosition.RIGHT);
const thirdLegendItem = document.querySelector(
'[data-legend-item-id="2"]',
) as HTMLElement;
const copyButton = within(thirdLegendItem).getByTestId('legend-copy');
fireEvent.click(copyButton);
expect(mockWriteText).toHaveBeenCalledTimes(1);
expect(mockWriteText).toHaveBeenCalledWith('C');
});
it('does not call onLegendClick when copy button is clicked', () => {
renderLegend(LegendPosition.RIGHT);
const firstLegendItem = document.querySelector(
'[data-legend-item-id="0"]',
) as HTMLElement;
const copyButton = within(firstLegendItem).getByTestId('legend-copy');
fireEvent.click(copyButton);
expect(onLegendClick).not.toHaveBeenCalled();
});
});
});

View File

@@ -3,9 +3,8 @@ package flagger
import "github.com/SigNoz/signoz/pkg/types/featuretypes"
var (
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureInterpolationEnabled = featuretypes.MustNewName("interpolation_enabled")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
)
func MustNewRegistry() featuretypes.Registry {
@@ -18,14 +17,6 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureInterpolationEnabled,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether to enable interpolation",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureKafkaSpanEval,
Kind: featuretypes.KindBoolean,

View File

@@ -483,6 +483,22 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
value1 := v.Visit(values[0])
value2 := v.Visit(values[1])
switch value1.(type) {
case float64:
if _, ok := value2.(float64); !ok {
v.errors = append(v.errors, fmt.Sprintf("value type mismatch for key %s: expected number for both operands", keys[0].Name))
return ""
}
case string:
if _, ok := value2.(string); !ok {
v.errors = append(v.errors, fmt.Sprintf("value type mismatch for key %s: expected string for both operands", keys[0].Name))
return ""
}
default:
v.errors = append(v.errors, fmt.Sprintf("value type mismatch for key %s: operands must be number or string", keys[0].Name))
return ""
}
var conds []string
for _, key := range keys {
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, []any{value1, value2}, v.builder, v.startNs, v.endNs)
@@ -855,7 +871,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
// 1. either user meant key ( this is already handled above in fieldKeysForName )
// 2. or user meant `attribute.key` we look up in the map for all possible field keys with name 'attribute.key'
// Note:
// Note:
// If user only wants to search `attribute.key`, then they have to use `attribute.attribute.key`
// If user only wants to search `key`, then they have to use `key`
// If user wants to search both, they can use `attribute.key` and we will resolve the ambiguity

View File

@@ -375,13 +375,6 @@ func mergeAndEnsureBackwardCompatibility(ctx context.Context, logger *slog.Logge
config.Flagger.Config.Boolean[flagger.FeatureKafkaSpanEval.String()] = os.Getenv("KAFKA_SPAN_EVAL") == "true"
}
if os.Getenv("INTERPOLATION_ENABLED") != "" {
logger.WarnContext(ctx, "[Deprecated] env INTERPOLATION_ENABLED is deprecated and scheduled for removal. Please use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_INTERPOLATION__ENABLED instead.")
if config.Flagger.Config.Boolean == nil {
config.Flagger.Config.Boolean = make(map[string]bool)
}
config.Flagger.Config.Boolean[flagger.FeatureInterpolationEnabled.String()] = os.Getenv("INTERPOLATION_ENABLED") == "true"
}
}
func (config Config) Collect(_ context.Context, _ valuer.UUID) (map[string]any, error) {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
@@ -91,8 +92,9 @@ func (b *meterQueryStatementBuilder) buildPipelineStatement(
}
// spatial_aggregation_cte
frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys)
if frag != "" {
if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
@@ -128,7 +130,10 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", []any{}, err
}
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
@@ -208,8 +213,11 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
@@ -278,7 +286,10 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
@@ -315,25 +326,23 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", telemetrymetrics.RateTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", telemetrymetrics.IncreaseTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
@@ -348,7 +357,15 @@ func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE(
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any) {
) (string, []any, error) {
if query.Aggregations[0].SpaceAggregation.IsZero() {
return "", []any{}, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid space aggregation, should be one of the following: [`sum`, `avg`, `min`, `max`, `count`]",
)
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
@@ -365,5 +382,5 @@ func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE(
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
}

View File

@@ -3,6 +3,7 @@ package telemetrymeter
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -63,7 +64,7 @@ func AggregationColumnForSamplesTable(
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
) (string, error) {
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
@@ -190,5 +191,13 @@ func AggregationColumnForSamplesTable(
}
}
return aggregationColumn
if aggregationColumn == "" {
return "", errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid time aggregation, should be one of the following: [`latest`, `sum`, `avg`, `min`, `max`, `count`, `rate`, `increase`]",
)
}
return aggregationColumn, nil
}

View File

@@ -29,13 +29,7 @@ func (c *conditionBuilder) conditionFor(
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
@@ -44,6 +38,18 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
// todo(srikanthccv): use the same data type collision handling when metrics schemas are updated
switch v := value.(type) {
case float64:
tblFieldName = fmt.Sprintf("toFloat64OrNull(%s)", tblFieldName)
case []any:
if len(v) > 0 && (operator == qbtypes.FilterOperatorBetween || operator == qbtypes.FilterOperatorNotBetween) {
if _, ok := v[0].(float64); ok {
tblFieldName = fmt.Sprintf("toFloat64OrNull(%s)", tblFieldName)
}
}
}
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(tblFieldName, value), nil

View File

@@ -5,67 +5,27 @@ import (
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/slices"
)
const (
RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
RateTmpl = `multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window))`
RateWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF((%s - lagInFrame(%s, 1, 0) OVER rate_window) < 0, %s / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (%s - lagInFrame(%s, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))) AS per_series_value`
IncreaseWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF((%s - lagInFrame(%s, 1, 0) OVER rate_window) < 0, %s, ((%s - lagInFrame(%s, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))) AS per_series_value`
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
IncreaseTmpl = `multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value, per_series_value - lagInFrame(per_series_value, 1) OVER rate_window)`
RateWithInterpolation = `
CASE
WHEN row_number() OVER rate_window = 1 THEN
-- First row: try to interpolate using next value
CASE
WHEN leadInFrame(per_series_value, 1) OVER rate_window IS NOT NULL THEN
-- Assume linear growth to next point
(leadInFrame(per_series_value, 1) OVER rate_window - per_series_value) /
(leadInFrame(ts, 1) OVER rate_window - ts)
ELSE
0 -- No next value either, can't interpolate
END
WHEN (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0 THEN
-- Counter reset detected
per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window)
ELSE
-- Normal case: calculate rate
(per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) /
(ts - lagInFrame(ts, 1) OVER rate_window)
END`
RateWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s / (ts - lagInFrame(ts, 1) OVER rate_window), (%s - lagInFrame(%s, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window))) AS per_series_value`
IncreaseWithInterpolation = `
CASE
WHEN row_number() OVER rate_window = 1 THEN
-- First row: try to interpolate using next value
CASE
WHEN leadInFrame(per_series_value, 1) OVER rate_window IS NOT NULL THEN
-- Calculate the interpolated increase for this interval
((leadInFrame(per_series_value, 1) OVER rate_window - per_series_value) /
(leadInFrame(ts, 1) OVER rate_window - ts)) *
(leadInFrame(ts, 1) OVER rate_window - ts)
ELSE
0 -- No next value either, can't interpolate
END
WHEN (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0 THEN
-- Counter reset detected: the increase is the current value
per_series_value
ELSE
-- Normal case: calculate increase
(per_series_value - lagInFrame(per_series_value, 1) OVER rate_window)
END`
IncreaseWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s, (%s - lagInFrame(%s, 1) OVER rate_window))) AS per_series_value`
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
)
type MetricQueryStatementBuilder struct {
@@ -258,8 +218,9 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
if b.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
if frag != "" {
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
@@ -273,8 +234,9 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
// spatial_aggregation_cte
frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys)
if frag != "" {
if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
@@ -294,7 +256,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any) {
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
@@ -307,10 +269,13 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(
aggCol, err := AggregationColumnForSamplesTable(
start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints,
)
if err != nil {
return "", []any{}, err
}
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
@@ -334,7 +299,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
}
func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
@@ -437,7 +402,10 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", []any{}, err
}
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
@@ -461,7 +429,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
}
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
ctx context.Context,
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
@@ -479,7 +447,10 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", []any{}, err
}
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
@@ -496,36 +467,25 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
// ! TODO (balanikaran) Get OrgID via function parameter instead of valuer.GenerateUUID()
interpolationEnabled := b.flagger.BooleanOrEmpty(ctx, flagger.FeatureInterpolationEnabled, featuretypes.NewFlaggerEvaluationContext(valuer.GenerateUUID()))
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegative, start, start)
if interpolationEnabled {
rateExpr = RateWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", RateTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(IncreaseWithoutNegative, start, start)
if interpolationEnabled {
incExpr = IncreaseWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", IncreaseTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
@@ -534,7 +494,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
}
// because RateInterpolation is not enabled anywhere due to some gaps in the logic wrt cache handling, it hasn't been considered for the multi temporality
func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
_ context.Context,
start, end uint64,
@@ -553,18 +512,32 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggForDeltaTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggForCumulativeTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggForDeltaTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", []any{}, err
}
aggForCumulativeTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", []any{}, err
}
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggForDeltaTemporality = fmt.Sprintf("%s/%d", aggForDeltaTemporality, stepSec)
}
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegativeMultiTemporality, aggForDeltaTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, start, aggForCumulativeTemporality, aggForCumulativeTemporality, start)
rateExpr := fmt.Sprintf(RateWithoutNegativeMultiTemporality,
aggForDeltaTemporality,
aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality,
aggForCumulativeTemporality, aggForCumulativeTemporality,
)
sb.SelectMore(rateExpr)
case metrictypes.TimeAggregationIncrease:
increaseExpr := fmt.Sprintf(IncreaseWithoutNegativeMultiTemporality, aggForDeltaTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality, start, start)
increaseExpr := fmt.Sprintf(IncreaseWithoutNegativeMultiTemporality,
aggForDeltaTemporality,
aggForCumulativeTemporality, aggForCumulativeTemporality, aggForCumulativeTemporality,
aggForCumulativeTemporality, aggForCumulativeTemporality,
)
sb.SelectMore(increaseExpr)
default:
expr := fmt.Sprintf(OthersMultiTemporality, aggForDeltaTemporality, aggForCumulativeTemporality)
@@ -592,7 +565,14 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any) {
) (string, []any, error) {
if query.Aggregations[0].SpaceAggregation.IsZero() {
return "", []any{}, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid space aggregation, should be one of the following: [`sum`, `avg`, `min`, `max`, `count`, `p50`, `p75`, `p90`, `p95`, `p99`]",
)
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
@@ -609,7 +589,7 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
}
func (b *MetricQueryStatementBuilder) BuildFinalSelect(

View File

@@ -3,6 +3,7 @@ package telemetrymetrics
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -168,7 +169,7 @@ func AggregationColumnForSamplesTable(
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
) (string, error) {
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
@@ -298,5 +299,12 @@ func AggregationColumnForSamplesTable(
}
}
}
return aggregationColumn
if aggregationColumn == "" {
return "", errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid time aggregation, should be one of the following: [`latest`, `sum`, `avg`, `min`, `max`, `count`, `rate`, `increase`]",
)
}
return aggregationColumn, nil
}

View File

@@ -35,13 +35,7 @@ func (c *conditionBuilder) conditionFor(
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}

View File

@@ -152,7 +152,9 @@ func (f FilterOperator) IsStringSearchOperator() bool {
FilterOperatorILike,
FilterOperatorNotILike,
FilterOperatorLike,
FilterOperatorNotLike:
FilterOperatorNotLike,
FilterOperatorRegexp,
FilterOperatorNotRegexp:
return true
default:
return false

View File

@@ -12,6 +12,7 @@ import (
var (
ErrColumnNotFound = errors.Newf(errors.TypeNotFound, errors.CodeNotFound, "field not found")
ErrBetweenValues = errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "(not) between operator requires two values")
ErrBetweenValuesType = errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "(not) between operator requires two values of the number type")
ErrInValues = errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "(not) in operator requires a list of values")
ErrUnsupportedOperator = errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unsupported operator")
)

View File

@@ -179,14 +179,6 @@ func (q *QueryBuilderQuery[T]) validateAggregations() error {
aggId,
)
}
// Validate metric-specific aggregations
if err := validateMetricAggregation(v); err != nil {
aggId := fmt.Sprintf("aggregation #%d", i+1)
if q.Name != "" {
aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name)
}
return wrapValidationError(err, aggId, "invalid metric %s: %s")
}
case TraceAggregation:
if v.Expression == "" {
aggId := fmt.Sprintf("aggregation #%d", i+1)
@@ -803,85 +795,3 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro
)
}
}
// validateMetricAggregation validates metric-specific aggregation parameters
func validateMetricAggregation(agg MetricAggregation) error {
// we can't decide anything here without known temporality
if agg.Temporality == metrictypes.Unknown {
return nil
}
// Validate that rate/increase are only used with appropriate temporalities
if agg.TimeAggregation == metrictypes.TimeAggregationRate || agg.TimeAggregation == metrictypes.TimeAggregationIncrease {
// For gauge metrics (Unspecified temporality), rate/increase doesn't make sense
if agg.Temporality == metrictypes.Unspecified {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"rate/increase aggregation cannot be used with gauge metrics (unspecified temporality)",
)
}
}
// Validate percentile aggregations are only used with histogram types
if agg.SpaceAggregation.IsPercentile() {
if agg.Type != metrictypes.HistogramType && agg.Type != metrictypes.ExpHistogramType && agg.Type != metrictypes.SummaryType {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"percentile aggregation can only be used with histogram or summary metric types",
)
}
}
// Validate time aggregation values
validTimeAggregations := []metrictypes.TimeAggregation{
metrictypes.TimeAggregationUnspecified,
metrictypes.TimeAggregationLatest,
metrictypes.TimeAggregationSum,
metrictypes.TimeAggregationAvg,
metrictypes.TimeAggregationMin,
metrictypes.TimeAggregationMax,
metrictypes.TimeAggregationCount,
metrictypes.TimeAggregationCountDistinct,
metrictypes.TimeAggregationRate,
metrictypes.TimeAggregationIncrease,
}
validTimeAgg := slices.Contains(validTimeAggregations, agg.TimeAggregation)
if !validTimeAgg {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time aggregation: %s",
agg.TimeAggregation.StringValue(),
).WithAdditional(
"Valid time aggregations: latest, sum, avg, min, max, count, count_distinct, rate, increase",
)
}
// Validate space aggregation values
validSpaceAggregations := []metrictypes.SpaceAggregation{
metrictypes.SpaceAggregationUnspecified,
metrictypes.SpaceAggregationSum,
metrictypes.SpaceAggregationAvg,
metrictypes.SpaceAggregationMin,
metrictypes.SpaceAggregationMax,
metrictypes.SpaceAggregationCount,
metrictypes.SpaceAggregationPercentile50,
metrictypes.SpaceAggregationPercentile75,
metrictypes.SpaceAggregationPercentile90,
metrictypes.SpaceAggregationPercentile95,
metrictypes.SpaceAggregationPercentile99,
}
validSpaceAgg := slices.Contains(validSpaceAggregations, agg.SpaceAggregation)
if !validSpaceAgg {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid space aggregation: %s",
agg.SpaceAggregation.StringValue(),
).WithAdditional(
"Valid space aggregations: sum, avg, min, max, count, p50, p75, p90, p95, p99",
)
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -54,17 +54,17 @@ def test_rate_with_steady_values_and_reset(
data = response.json()
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
assert len(result_values) >= 59
assert len(result_values) >= 58
# the counter reset happened at 31st minute
assert (
result_values[30]["value"] == 0.0167
result_values[29]["value"] == 0.0167
) # i.e 2/120 i.e 29th to 31st minute changes
assert (
result_values[31]["value"] == 0.133
result_values[30]["value"] == 0.133
) # i.e 10/60 i.e 31st to 32nd minute changes
count_of_steady_rate = sum(1 for v in result_values if v["value"] == 0.0833)
assert (
count_of_steady_rate >= 56
count_of_steady_rate >= 55
) # 59 - (1 reset + 1 high rate + 1 at the beginning)
# All rates should be non-negative (stale periods = 0 rate)
for v in result_values:

View File

@@ -21,8 +21,13 @@ from fixtures.querier import (
from fixtures.utils import get_testdata_file_path
MULTI_TEMPORALITY_FILE = get_testdata_file_path("multi_temporality_counters_1h.jsonl")
MULTI_TEMPORALITY_FILE_10h = get_testdata_file_path("multi_temporality_counters_10h.jsonl")
MULTI_TEMPORALITY_FILE_24h = get_testdata_file_path("multi_temporality_counters_24h.jsonl")
MULTI_TEMPORALITY_FILE_10h = get_testdata_file_path(
"multi_temporality_counters_10h.jsonl"
)
MULTI_TEMPORALITY_FILE_24h = get_testdata_file_path(
"multi_temporality_counters_24h.jsonl"
)
@pytest.mark.parametrize(
"time_aggregation, expected_value_at_31st_minute, expected_value_at_32nd_minute, steady_value",
@@ -39,7 +44,7 @@ def test_with_steady_values_and_reset(
time_aggregation: str,
expected_value_at_31st_minute: float,
expected_value_at_32nd_minute: float,
steady_value: float
steady_value: float,
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
@@ -67,24 +72,24 @@ def test_with_steady_values_and_reset(
data = response.json()
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
assert len(result_values) >= 59
assert len(result_values) >= 58
# the counter reset happened at 31st minute
# we skip the rate value for the first data point without previous value
assert result_values[29]["value"] == expected_value_at_31st_minute
assert result_values[30]["value"] == expected_value_at_32nd_minute
assert (
result_values[30]["value"] == expected_value_at_31st_minute
)
assert (
result_values[31]["value"] == expected_value_at_32nd_minute
)
assert (
result_values[39]["value"] == steady_value
) # 39th minute is when cumulative shifts to delta
result_values[38]["value"] == steady_value
) # 38th minute is when cumulative shifts to delta
count_of_steady_rate = sum(1 for v in result_values if v["value"] == steady_value)
assert (
count_of_steady_rate >= 56
count_of_steady_rate >= 55
) # 59 - (1 reset + 1 high rate + 1 at the beginning)
# All rates should be non-negative (stale periods = 0 rate)
for v in result_values:
assert v["value"] >= 0, f"{time_aggregation} should not be negative: {v['value']}"
assert (
v["value"] >= 0
), f"{time_aggregation} should not be negative: {v['value']}"
@pytest.mark.parametrize(
"time_aggregation, stable_health_value, stable_products_value, stable_checkout_value, spike_checkout_value, stable_orders_value, spike_users_value",
@@ -161,20 +166,26 @@ def test_group_by_endpoint(
assert (
len(health_values) >= 58
), f"Expected >= 58 values for /health, got {len(health_values)}"
count_steady_health = sum(1 for v in health_values if v["value"] == stable_health_value)
count_steady_health = sum(
1 for v in health_values if v["value"] == stable_health_value
)
assert (
count_steady_health >= 57
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
# all /health rates should be state except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert v["value"] == stable_health_value, f"Expected /health rate {stable_health_value}, got {v['value']}"
assert (
v["value"] == stable_health_value
), f"Expected /health rate {stable_health_value}, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
products_values = endpoint_values["/products"]
assert (
len(products_values) >= 49
), f"Expected >= 49 values for /products, got {len(products_values)}"
count_steady_products = sum(1 for v in products_values if v["value"] == stable_products_value)
count_steady_products = sum(
1 for v in products_values if v["value"] == stable_products_value
)
# most values should be stable, some boundary values differ due to 10-min gap
assert (
@@ -182,7 +193,9 @@ def test_group_by_endpoint(
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
# check that non-stable values are due to gap averaging (should be lower)
gap_boundary_values = [v["value"] for v in products_values if v["value"] != stable_products_value]
gap_boundary_values = [
v["value"] for v in products_values if v["value"] != stable_products_value
]
for val in gap_boundary_values:
assert (
0 < val < stable_products_value
@@ -193,12 +206,16 @@ def test_group_by_endpoint(
assert (
len(checkout_values) >= 59
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == stable_checkout_value)
count_steady_checkout = sum(
1 for v in checkout_values if v["value"] == stable_checkout_value
)
assert (
count_steady_checkout >= 53
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == spike_checkout_value)
count_spike_checkout = sum(
1 for v in checkout_values if v["value"] == spike_checkout_value
)
assert (
count_spike_checkout >= 4
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
@@ -220,12 +237,16 @@ def test_group_by_endpoint(
assert (
len(orders_values) >= 58
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(1 for v in orders_values if v["value"] == stable_orders_value)
count_steady_orders = sum(
1 for v in orders_values if v["value"] == stable_orders_value
)
assert (
count_steady_orders >= 55
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [v["value"] for v in orders_values if v["value"] != stable_orders_value]
non_standard_orders = [
v["value"] for v in orders_values if v["value"] != stable_orders_value
]
assert (
len(non_standard_orders) >= 2
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
@@ -252,6 +273,7 @@ def test_group_by_endpoint(
count_increment_rate >= 8
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
@pytest.mark.parametrize(
"time_aggregation, expected_value_at_30th_minute, expected_value_at_31st_minute, value_at_switch",
[
@@ -267,7 +289,7 @@ def test_for_service_with_switch(
time_aggregation: str,
expected_value_at_30th_minute: float,
expected_value_at_31st_minute: float,
value_at_switch: float
value_at_switch: float,
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
@@ -295,22 +317,19 @@ def test_for_service_with_switch(
data = response.json()
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
assert len(result_values) >= 60
assert len(result_values) >= 59
assert result_values[29]["value"] == expected_value_at_30th_minute # 0.183
assert result_values[30]["value"] == expected_value_at_31st_minute # 0.183
assert result_values[37]["value"] == value_at_switch # 0.25
assert (
result_values[30]["value"] == expected_value_at_30th_minute #0.183
)
assert (
result_values[31]["value"] == expected_value_at_31st_minute # 0.183
)
assert (
result_values[38]["value"] == value_at_switch # 0.25
)
assert (
result_values[39]["value"] == value_at_switch # 0.25
) # 39th minute is when cumulative shifts to delta
result_values[38]["value"] == value_at_switch # 0.25
) # 39th minute is when cumulative shifts to delta
# All rates should be non-negative (stale periods = 0 rate)
for v in result_values:
assert v["value"] >= 0, f"{time_aggregation} should not be negative: {v['value']}"
assert (
v["value"] >= 0
), f"{time_aggregation} should not be negative: {v['value']}"
@pytest.mark.parametrize(
"time_aggregation, expected_value",
@@ -355,6 +374,7 @@ def test_for_week_long_time_range(
for value in result_values[1:]:
assert value["value"] == expected_value
@pytest.mark.parametrize(
"time_aggregation, expected_value",
[