fix: fillZero usage in query builder and correct step values to ms (#9880)

This commit is contained in:
Niladri Adhikary
2026-01-04 17:00:16 +05:30
committed by GitHub
parent 9aa0073fef
commit 14ab4c9b79
11 changed files with 2954 additions and 21 deletions

View File

@@ -376,7 +376,7 @@ func funcFillZero(result *TimeSeries, start, end, step int64) *TimeSeries {
return result
}
alignedStart := start - (start % (step * 1000))
alignedStart := start - (start % step)
alignedEnd := end
existingValues := make(map[int64]*TimeSeriesValue)
@@ -386,7 +386,7 @@ func funcFillZero(result *TimeSeries, start, end, step int64) *TimeSeries {
filledValues := make([]*TimeSeriesValue, 0)
for ts := alignedStart; ts <= alignedEnd; ts += step * 1000 {
for ts := alignedStart; ts <= alignedEnd; ts += step {
if val, exists := existingValues[ts]; exists {
filledValues = append(filledValues, val)
} else {

View File

@@ -698,7 +698,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 3000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -717,7 +717,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 3000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -737,7 +737,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 6000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -761,7 +761,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 6000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -780,7 +780,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 3000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 0},
@@ -798,7 +798,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 3000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -820,7 +820,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 1000,
end: 4000,
step: 1,
step: 1000,
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 1000, Value: 1.0},
@@ -841,7 +841,7 @@ func TestFuncFillZero(t *testing.T) {
},
start: 50000, // Not aligned to 60s
end: 250000, // Not aligned to 60s
step: 60, // 60 seconds
step: 60000, // 60 seconds
expected: &TimeSeries{
Values: []*TimeSeriesValue{
{Timestamp: 0, Value: 0}, // Aligned start
@@ -890,7 +890,7 @@ func TestApplyFunction_FillZero(t *testing.T) {
Args: []FunctionArg{
{Value: 1000.0}, // start
{Value: 4000.0}, // end
{Value: 1.0}, // step
{Value: 1000.0}, // step
},
}

View File

@@ -218,13 +218,13 @@ func (r *QueryRangeRequest) StepIntervalForQuery(name string) int64 {
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
stepsMap[spec.Name] = spec.StepInterval.Milliseconds()
case QueryBuilderQuery[LogAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
stepsMap[spec.Name] = spec.StepInterval.Milliseconds()
case QueryBuilderQuery[MetricAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
stepsMap[spec.Name] = spec.StepInterval.Milliseconds()
case PromQuery:
stepsMap[spec.Name] = int64(spec.Step.Seconds())
stepsMap[spec.Name] = spec.Step.Milliseconds()
}
}

View File

@@ -14,6 +14,7 @@ pytest_plugins = [
"fixtures.signoz",
"fixtures.logs",
"fixtures.traces",
"fixtures.metrics",
"fixtures.driver",
"fixtures.idp",
"fixtures.idputils",

View File

@@ -1,7 +1,7 @@
import datetime
import json
from abc import ABC
from typing import Any, Callable, Generator, List
from typing import Any, Callable, Generator, List, Optional
import numpy as np
import pytest
@@ -116,7 +116,7 @@ class Logs(ABC):
def __init__(
self,
timestamp: datetime.datetime = datetime.datetime.now(),
timestamp: Optional[datetime.datetime] = None,
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
@@ -128,6 +128,8 @@ class Logs(ABC):
scope_version: str = "",
scope_attributes: dict[str, str] = {},
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
self.tag_attributes = []
self.attribute_keys = []
self.resource_keys = []

View File

@@ -0,0 +1,250 @@
import datetime
import hashlib
import json
from abc import ABC
from typing import Any, Callable, Generator, List, Optional
import numpy as np
import pytest
from fixtures import types
class MetricsTimeSeries(ABC):
"""Represents a row in the time_series_v4 table."""
env: str
temporality: str
metric_name: str
description: str
unit: str
type: str
is_monotonic: bool
fingerprint: np.uint64
unix_milli: np.int64
labels: str
attrs: dict[str, str]
scope_attrs: dict[str, str]
resource_attrs: dict[str, str]
__normalized: bool
def __init__(
self,
metric_name: str,
labels: dict[str, str],
timestamp: datetime.datetime,
temporality: str = "Unspecified",
description: str = "",
unit: str = "",
type_: str = "Sum",
is_monotonic: bool = True,
env: str = "default",
resource_attrs: dict[str, str] = {},
scope_attrs: dict[str, str] = {},
) -> None:
self.env = env
self.metric_name = metric_name
self.temporality = temporality
self.description = description
self.unit = unit
self.type = type_
self.is_monotonic = is_monotonic
self.labels = json.dumps(labels, separators=(",", ":"))
self.attrs = labels
self.scope_attrs = scope_attrs
self.resource_attrs = resource_attrs
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
self.__normalized = False
# Calculate fingerprint from metric_name + labels
fingerprint_str = metric_name + self.labels
self.fingerprint = np.uint64(
int(hashlib.md5(fingerprint_str.encode()).hexdigest()[:16], 16)
)
def to_row(self) -> list:
return [
self.env,
self.temporality,
self.metric_name,
self.description,
self.unit,
self.type,
self.is_monotonic,
self.fingerprint,
self.unix_milli,
self.labels,
self.attrs,
self.scope_attrs,
self.resource_attrs,
self.__normalized,
]
class MetricsSample(ABC):
"""Represents a row in the samples_v4 table."""
env: str
temporality: str
metric_name: str
fingerprint: np.uint64
unix_milli: np.int64
value: np.float64
flags: np.uint32
def __init__(
self,
metric_name: str,
fingerprint: np.uint64,
timestamp: datetime.datetime,
value: float,
temporality: str = "Unspecified",
env: str = "default",
flags: int = 0,
) -> None:
self.env = env
self.temporality = temporality
self.metric_name = metric_name
self.fingerprint = fingerprint
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
self.value = np.float64(value)
self.flags = np.uint32(flags)
def to_row(self) -> list:
return [
self.env,
self.temporality,
self.metric_name,
self.fingerprint,
self.unix_milli,
self.value,
self.flags,
]
class Metrics(ABC):
"""High-level metric representation. Produces both time series and sample entries."""
metric_name: str
labels: dict[str, str]
temporality: str
timestamp: datetime.datetime
value: float
flags: int
_time_series: MetricsTimeSeries
_sample: MetricsSample
def __init__(
self,
metric_name: str,
labels: dict[str, str] = {},
timestamp: Optional[datetime.datetime] = None,
value: float = 0.0,
temporality: str = "Unspecified",
flags: int = 0,
description: str = "",
unit: str = "",
type_: str = "Sum",
is_monotonic: bool = True,
env: str = "default",
resource_attributes: dict[str, str] = {},
scope_attributes: dict[str, str] = {},
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
self.metric_name = metric_name
self.labels = labels
self.temporality = temporality
self.timestamp = timestamp
self.value = value
self.flags = flags
self._time_series = MetricsTimeSeries(
metric_name=metric_name,
labels=labels,
timestamp=timestamp,
temporality=temporality,
description=description,
unit=unit,
type_=type_,
is_monotonic=is_monotonic,
env=env,
resource_attrs=resource_attributes,
scope_attrs=scope_attributes,
)
self._sample = MetricsSample(
metric_name=metric_name,
fingerprint=self._time_series.fingerprint,
timestamp=timestamp,
value=value,
temporality=temporality,
env=env,
flags=flags,
)
@pytest.fixture(name="insert_metrics", scope="function")
def insert_metrics(
clickhouse: types.TestContainerClickhouse,
) -> Generator[Callable[[List[Metrics]], None], Any, None]:
def _insert_metrics(metrics: List[Metrics]) -> None:
"""
Insert metrics into ClickHouse tables.
This function handles insertion into:
- distributed_time_series_v4 (time series metadata)
- distributed_samples_v4 (actual sample values)
"""
time_series_map: dict[int, MetricsTimeSeries] = {}
for metric in metrics:
fp = int(metric._time_series.fingerprint)
if fp not in time_series_map:
time_series_map[fp] = metric._time_series
if len(time_series_map) > 0:
clickhouse.conn.insert(
database="signoz_metrics",
table="distributed_time_series_v4",
column_names=[
"env",
"temporality",
"metric_name",
"description",
"unit",
"type",
"is_monotonic",
"fingerprint",
"unix_milli",
"labels",
"attrs",
"scope_attrs",
"resource_attrs",
"__normalized",
],
data=[ts.to_row() for ts in time_series_map.values()],
)
samples = [metric._sample for metric in metrics]
if len(samples) > 0:
clickhouse.conn.insert(
database="signoz_metrics",
table="distributed_samples_v4",
column_names=[
"env",
"temporality",
"metric_name",
"fingerprint",
"unix_milli",
"value",
"flags",
],
data=[sample.to_row() for sample in samples],
)
yield _insert_metrics
# Cleanup
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metrics.time_series_v4 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metrics.samples_v4 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)

View File

@@ -5,7 +5,7 @@ import secrets
import uuid
from abc import ABC
from enum import Enum
from typing import Any, Callable, Generator, List
from typing import Any, Callable, Generator, List, Optional
from urllib.parse import urlparse
import numpy as np
@@ -254,7 +254,7 @@ class Traces(ABC):
def __init__(
self,
timestamp: datetime.datetime = datetime.datetime.now(),
timestamp: Optional[datetime.datetime] = None,
duration: datetime.timedelta = datetime.timedelta(seconds=1),
trace_id: str = "",
span_id: str = "",
@@ -270,6 +270,8 @@ class Traces(ABC):
trace_state: str = "",
flags: np.uint32 = 0,
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
self.tag_attributes = []
self.attribute_keys = []
self.resource_keys = []

View File

@@ -1,12 +1,17 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List
from typing import Callable, Dict, List
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logs import Logs
from src.querier.timeseries_utils import (
assert_minutely_bucket_values,
find_named_result,
index_series_by_label,
)
def test_logs_list(
@@ -1333,3 +1338,806 @@ def test_datatype_collision(
count = results[0]["data"][0][0]
# Should return 1 log with empty http.status_code (edge case log)
assert count == 1
def test_logs_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillGaps for logs without groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "test-service"},
attributes={"code.file": "test.py"},
body="Log at minute 3",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=1),
resources={"service.name": "test-service"},
attributes={"code.file": "test.py"},
body="Log at minute 1",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
# Logs are exactly at minute -3 and minute -1, so counts should be 1 there and 0 everywhere else
ts_min_1 = int((now - timedelta(minutes=1)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_1: 1, ts_min_3: 1},
context="logs/fillGaps",
)
def test_logs_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillGaps for logs with groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "service-a"},
attributes={"code.file": "test.py"},
body="Log from service A",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "service-b"},
attributes={"code.file": "test.py"},
body="Log from service B",
severity_text="ERROR",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"groupBy": [
{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}
],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2, "Expected 2 series for 2 service groups"
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"service-a", "service-b"}
# service-a has one log at minute -3, service-b at minute -2
expectations = {
"service-a": {ts_min_3: 1},
"service-b": {ts_min_2: 1},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"logs/fillGaps/{service_name}",
)
def test_logs_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillGaps for logs with formula.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "test"},
attributes={"code.file": "test.py"},
body="Test log",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "another-test"},
attributes={"code.file": "test.py"},
body="Another test log",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"filter": {
"expression": "service.name = 'another-test'"
},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
assert_minutely_bucket_values(
series[0]["values"],
now,
expected_by_ts={ts_min_3: 1, ts_min_2: 1},
context="logs/fillGaps/F1",
)
def test_logs_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillGaps for logs with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "group1"},
attributes={"code.file": "test.py"},
body="Test log",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "group2"},
attributes={"code.file": "test.py"},
body="Test log 2",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"group1", "group2"}
expectations = {
"group1": {ts_min_3: 2},
"group2": {ts_min_2: 2},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"logs/fillGaps/F1/{service_name}",
)
def test_logs_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillZero function for logs without groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "test"},
attributes={"code.file": "test.py"},
body="Test log",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0].get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 1},
context="logs/fillZero",
)
def test_logs_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillZero function for logs with groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "service-a"},
attributes={"code.file": "test.py"},
body="Log A",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "service-b"},
attributes={"code.file": "test.py"},
body="Log B",
severity_text="ERROR",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2, "Expected 2 series for 2 service groups"
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"service-a", "service-b"}
expectations = {
"service-a": {ts_min_3: 1},
"service-b": {ts_min_2: 1},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"logs/fillZero/{service_name}",
)
def test_logs_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillZero function for logs with formula.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "test"},
attributes={"code.file": "test.py"},
body="Test log",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "another-test"},
attributes={"code.file": "test.py"},
body="Another log",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'another-test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 1, ts_min_2: 1},
context="logs/fillZero/F1",
)
def test_logs_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Test fillZero function for logs with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs: List[Logs] = [
Logs(
timestamp=now - timedelta(minutes=3),
resources={"service.name": "group1"},
attributes={"code.file": "test.py"},
body="Test log",
severity_text="INFO",
),
Logs(
timestamp=now - timedelta(minutes=2),
resources={"service.name": "group2"},
attributes={"code.file": "test.py"},
body="Test log 2",
severity_text="INFO",
),
]
insert_logs(logs)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "logs",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"group1", "group2"}
expectations = {
"group1": {ts_min_3: 2},
"group2": {ts_min_2: 2},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"logs/fillZero/F1/{service_name}",
)

View File

@@ -1,12 +1,17 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List
from typing import Callable, Dict, List
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
from src.querier.timeseries_utils import (
assert_minutely_bucket_values,
find_named_result,
index_series_by_label,
)
def test_traces_list(
@@ -413,3 +418,885 @@ def test_traces_list(
values = response.json()["data"]["values"]["stringValues"]
assert set(values) == set(["topic-service", "http-service"])
def test_traces_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillGaps for traces without groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
trace_id = TraceIdGenerator.trace_id()
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "test-service"},
attributes={"http.method": "GET"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 1},
context="traces/fillGaps",
)
def test_traces_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillGaps for traces with groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-a",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "service-a"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-b",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "service-b"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"groupBy": [
{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}
],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2, "Expected 2 series for 2 service groups"
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"service-a", "service-b"}
expectations: Dict[str, Dict[int, float]] = {
"service-a": {ts_min_3: 1},
"service-b": {ts_min_2: 1},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"traces/fillGaps/{service_name}",
)
def test_traces_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillGaps for traces with formula.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "test"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="another-test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "another-test"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'another-test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
assert_minutely_bucket_values(
series[0]["values"],
now,
expected_by_ts={ts_min_3: 1, ts_min_2: 1},
context="traces/fillGaps/F1",
)
def test_traces_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillGaps for traces with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-group1",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "group1"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-group2",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "group2"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"group1", "group2"}
expectations: Dict[str, Dict[int, float]] = {
"group1": {ts_min_3: 2},
"group2": {ts_min_2: 2},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"traces/fillGaps/F1/{service_name}",
)
def test_traces_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillZero function for traces without groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "test"},
attributes={"http.method": "GET"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0].get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 1},
context="traces/fillZero",
)
def test_traces_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillZero function for traces with groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-a",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "service-a"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-b",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "service-b"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2, "Expected 2 series for 2 service groups"
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"service-a", "service-b"}
expectations: Dict[str, Dict[int, float]] = {
"service-a": {ts_min_3: 1},
"service-b": {ts_min_2: 1},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"traces/fillZero/{service_name}",
)
def test_traces_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillZero function for traces with formula.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "test"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="another-test-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "another-test"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"filter": {"expression": "service.name = 'another-test'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
assert_minutely_bucket_values(
series[0]["values"],
now,
expected_by_ts={ts_min_3: 1, ts_min_2: 1},
context="traces/fillZero/F1",
)
def test_traces_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Test fillZero function for traces with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces: List[Traces] = [
Traces(
timestamp=now - timedelta(minutes=3),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-group1",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "group1"},
attributes={"http.method": "GET"},
),
Traces(
timestamp=now - timedelta(minutes=2),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="span-group2",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "group2"},
attributes={"http.method": "POST"},
),
]
insert_traces(traces)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_service = index_series_by_label(series, "service.name")
assert set(series_by_service.keys()) == {"group1", "group2"}
expectations: Dict[str, Dict[int, float]] = {
"group1": {ts_min_3: 2},
"group2": {ts_min_2: 2},
}
for service_name, s in series_by_service.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[service_name],
context=f"traces/fillZero/F1/{service_name}",
)

View File

@@ -0,0 +1,906 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, Dict, List
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from src.querier.timeseries_utils import (
assert_minutely_bucket_values,
find_named_result,
index_series_by_label,
)
def test_metrics_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillGaps for metrics without groupBy.
Verifies that gaps in time series are filled with zeros.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name = "test_fill_gaps_metric"
# Insert metrics at minute 3 and minute 1 (gap at minute 2)
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name,
labels={"service": "test-service"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name,
labels={"service": "test-service"},
timestamp=now - timedelta(minutes=1),
value=30.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
ts_min_1 = int((now - timedelta(minutes=1)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 10.0, ts_min_1: 20.0},
context="metrics/fillGaps",
)
def test_metrics_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillGaps for metrics with groupBy.
Verifies gaps are filled per group.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name = "test_fill_gaps_grouped_metric"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name,
labels={"my_tag": "service-a"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name,
labels={"my_tag": "service-b"},
timestamp=now - timedelta(minutes=2),
value=20.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": False,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1, "Expected at least one series"
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_tag = index_series_by_label(series, "my_tag")
assert set(series_by_tag.keys()) == {"service-a", "service-b"}
expectations: Dict[str, Dict[int, float]] = {
"service-a": {ts_min_3: 10.0},
"service-b": {ts_min_2: 20.0},
}
for tag_value, s in series_by_tag.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[tag_value],
context=f"metrics/fillGaps/{tag_value}",
)
def test_metrics_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillGaps for metrics with formula.
Verifies formula results have gaps filled.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name_a = "test_formula_metric_a"
metric_name_b = "test_formula_metric_b"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name_a,
labels={"service": "test"},
timestamp=now - timedelta(minutes=3),
value=100.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"service": "test"},
timestamp=now - timedelta(minutes=2),
value=10.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_a,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_b,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0].get("series") or []
assert len(series) >= 1, f"Expected at least one series for F1, got {aggregations[0]}"
assert_minutely_bucket_values(
series[0]["values"],
now,
expected_by_ts={ts_min_3: 100.0, ts_min_2: 10.0},
context="metrics/fillGaps/F1",
)
def test_metrics_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillGaps for metrics with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name_a = "test_formula_grouped_a"
metric_name_b = "test_formula_grouped_b"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name_a,
labels={"my_tag": "group1"},
timestamp=now - timedelta(minutes=3),
value=100.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"my_tag": "group1"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_a,
labels={"my_tag": "group2"},
timestamp=now - timedelta(minutes=2),
value=200.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"my_tag": "group2"},
timestamp=now - timedelta(minutes=2),
value=20.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_a,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_b,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": True},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_group = index_series_by_label(series, "my_tag")
assert set(series_by_group.keys()) == {"group1", "group2"}
expectations: Dict[str, Dict[int, float]] = {
"group1": {ts_min_3: 110.0},
"group2": {ts_min_2: 220.0},
}
for group, s in series_by_group.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[group],
context=f"metrics/fillGaps/F1/{group}",
)
def test_metrics_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillZero function for metrics without groupBy.
Verifies the fillZero function fills gaps with zeros.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name = "test_fill_zero_metric"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name,
labels={"service": "test"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0].get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) >= 1
values = series[0]["values"]
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
assert_minutely_bucket_values(
values,
now,
expected_by_ts={ts_min_3: 10.0},
context="metrics/fillZero",
)
def test_metrics_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillZero function for metrics with groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name = "test_fill_zero_grouped"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name,
labels={"my_tag": "service-a"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name,
labels={"my_tag": "service-b"},
timestamp=now - timedelta(minutes=2),
value=20.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": False,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) >= 1
aggregations = results[0].get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
series_by_tag = index_series_by_label(series, "my_tag")
assert set(series_by_tag.keys()) == {"service-a", "service-b"}
expectations: Dict[str, Dict[int, float]] = {
"service-a": {ts_min_3: 10.0},
"service-b": {ts_min_2: 20.0},
}
for tag_value, s in series_by_tag.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[tag_value],
context=f"metrics/fillZero/{tag_value}",
)
def test_metrics_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillZero function for metrics with formula.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name_a = "test_fz_formula_a"
metric_name_b = "test_fz_formula_b"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name_a,
labels={"service": "test"},
timestamp=now - timedelta(minutes=3),
value=100.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"service": "test"},
timestamp=now - timedelta(minutes=2),
value=10.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_a,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_b,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0].get("series") or []
assert len(series) >= 1, (
f"Expected at least one series for F1, got {aggregations[0]}"
)
assert_minutely_bucket_values(
series[0]["values"],
now,
expected_by_ts={ts_min_3: 100.0, ts_min_2: 10.0},
context="metrics/fillZero/F1",
)
def test_metrics_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
"""
Test fillZero function for metrics with formula and groupBy.
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
metric_name_a = "test_fz_formula_grp_a"
metric_name_b = "test_fz_formula_grp_b"
metrics: List[Metrics] = [
Metrics(
metric_name=metric_name_a,
labels={"my_tag": "group1"},
timestamp=now - timedelta(minutes=3),
value=100.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"my_tag": "group1"},
timestamp=now - timedelta(minutes=3),
value=10.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_a,
labels={"my_tag": "group2"},
timestamp=now - timedelta(minutes=2),
value=200.0,
temporality="Cumulative",
),
Metrics(
metric_name=metric_name_b,
labels={"my_tag": "group2"},
timestamp=now - timedelta(minutes=2),
value=20.0,
temporality="Cumulative",
),
]
insert_metrics(metrics)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_a,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "metrics",
"aggregations": [{
"metricName": metric_name_b,
"temporality": "cumulative",
"timeAggregation": "increase",
"spaceAggregation": "sum"
}],
"stepInterval": 60,
"disabled": True,
"groupBy": [{"name": "my_tag", "fieldDataType": "string", "fieldContext": "attribute"}],
},
},
{
"type": "builder_formula",
"spec": {
"name": "F1",
"expression": "A + B",
"disabled": False,
"functions": [{"name": "fillZero"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1 # Only F1 (A and B are disabled)
ts_min_2 = int((now - timedelta(minutes=2)).timestamp() * 1000)
ts_min_3 = int((now - timedelta(minutes=3)).timestamp() * 1000)
f1 = find_named_result(results, "F1")
assert f1 is not None, "Expected formula result named F1"
aggregations = f1.get("aggregations") or []
assert len(aggregations) == 1
series = aggregations[0]["series"]
assert len(series) == 2
series_by_group = index_series_by_label(series, "my_tag")
assert set(series_by_group.keys()) == {"group1", "group2"}
expectations: Dict[str, Dict[int, float]] = {
"group1": {ts_min_3: 110.0},
"group2": {ts_min_2: 220.0},
}
for group, s in series_by_group.items():
assert_minutely_bucket_values(
s["values"],
now,
expected_by_ts=expectations[group],
context=f"metrics/fillZero/F1/{group}",
)

View File

@@ -0,0 +1,77 @@
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
def expected_minutely_bucket_timestamps_ms(now: datetime) -> List[List[int]]:
previous_five = [
int((now - timedelta(minutes=m)).timestamp() * 1000)
for m in range(5, 0, -1)
]
with_current = previous_five + [int(now.timestamp() * 1000)]
return [previous_five, with_current]
def assert_minutely_bucket_timestamps(
points: List[Dict[str, Any]],
now: datetime,
*,
context: str,
) -> List[int]:
expected = expected_minutely_bucket_timestamps_ms(now)
actual = [p["timestamp"] for p in points]
assert actual in expected, f"Unexpected timestamps for {context}: {actual}"
return actual
def assert_minutely_bucket_values(
points: List[Dict[str, Any]],
now: datetime,
*,
expected_by_ts: Dict[int, float],
context: str,
) -> None:
timestamps = assert_minutely_bucket_timestamps(points, now, context=context)
expected = {ts: 0 for ts in timestamps}
expected.update(expected_by_ts)
for point in points:
ts = point["timestamp"]
assert point["value"] == expected[ts], (
f"Unexpected value for {context} at timestamp={ts}: "
f"got {point['value']}, expected {expected[ts]}"
)
def index_series_by_label(
series: List[Dict[str, Any]],
label_name: str,
) -> Dict[str, Dict[str, Any]]:
series_by_label: Dict[str, Dict[str, Any]] = {}
for s in series:
label = next(
(
l
for l in s.get("labels", [])
if l.get("key", {}).get("name") == label_name
),
None,
)
assert label is not None, f"Expected {label_name} label in series"
series_by_label[label["value"]] = s
return series_by_label
def find_named_result(
results: List[Dict[str, Any]],
name: str,
) -> Optional[Dict[str, Any]]:
return next(
(
r
for r in results
if r.get("name") == name
or r.get("queryName") == name
or (r.get("spec") or {}).get("name") == name
),
None,
)