Files
Abhishek Kumar Singh afdb674068
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
test(integration): added fixture for inserting alert data (#10101)
* chore: fixture for notification channel

* chore: return notification channel info in Create notification channel API

* fix: change scope of create channel fixture to function level

* test: added fixture for creating alert rule

* chore: added debug message on assertion failure

* refactor: improve error handling in webhook notification channel deletion

* fix: enhance error handling in alert rule creation and deletion

* chore: ran py linter and fmt

* chore: ran py linter and fmt

* fix: add timeout to alert rule creation and deletion requests

* fix: silenced pylint on too broad exception

* fix: suppress pylint warnings for broad exception handling in alert rule deletion

* test: added fixture for inserting alert data

* refactor: added fixture for getting test data file path

* feat: add alerts to integration CI workflow

* chore: linter fixes

* chore: changed scope for get_testdata_file_path

* chore: py-formatter

* chore: py-formatter

* chore: updated get_testdata_file_path fixture to a util function

* chore: removed wrong ref

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2026-02-02 23:37:14 +05:30

625 lines
22 KiB
Python

import datetime
import json
from abc import ABC
from typing import Any, Callable, Generator, List, Optional
import numpy as np
import pytest
from ksuid import KsuidMs
from fixtures import types
from fixtures.fingerprint import LogsOrTracesFingerprint
from fixtures.utils import parse_timestamp
class LogsResource(ABC):
labels: str
fingerprint: str
seen_at_ts_bucket_start: np.int64
def __init__(
self,
labels: dict[str, str],
fingerprint: str,
seen_at_ts_bucket_start: np.int64,
) -> None:
self.labels = json.dumps(
labels, separators=(",", ":")
) # clickhouse treats {"a": "b"} differently from {"a":"b"}. In the first case it is not able to run json functions
self.fingerprint = fingerprint
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
def np_arr(self) -> np.array:
return np.array(
[
self.labels,
self.fingerprint,
self.seen_at_ts_bucket_start,
]
)
class LogsResourceOrAttributeKeys(ABC):
name: str
datatype: str
def __init__(self, name: str, datatype: str) -> None:
self.name = name
self.datatype = datatype
def np_arr(self) -> np.array:
return np.array([self.name, self.datatype])
class LogsTagAttributes(ABC):
unix_milli: np.int64
tag_key: str
tag_type: str
tag_data_type: str
string_value: str
number_value: np.float64
def __init__(
self,
timestamp: datetime.datetime,
tag_key: str,
tag_type: str,
tag_data_type: str,
string_value: Optional[str],
number_value: np.float64,
) -> None:
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
self.tag_key = tag_key
self.tag_type = tag_type
self.tag_data_type = tag_data_type
self.string_value = string_value or ""
self.number_value = number_value
def np_arr(self) -> np.array:
return np.array(
[
self.unix_milli,
self.tag_key,
self.tag_type,
self.tag_data_type,
self.string_value,
self.number_value,
]
)
class Logs(ABC):
ts_bucket_start: np.uint64
resource_fingerprint: str
timestamp: np.uint64
observed_timestamp: np.uint64
id: str
trace_id: str
span_id: str
trace_flags: np.uint32
severity_text: str
severity_number: np.uint8
body: str
attributes_string: dict[str, str]
attributes_number: dict[str, np.float64]
attributes_bool: dict[str, bool]
resources_string: dict[str, str]
scope_name: str
scope_version: str
scope_string: dict[str, str]
resource: List[LogsResource]
tag_attributes: List[LogsTagAttributes]
resource_keys: List[LogsResourceOrAttributeKeys]
attribute_keys: List[LogsResourceOrAttributeKeys]
def __init__(
self,
timestamp: Optional[datetime.datetime] = None,
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
trace_flags: np.uint32 = 0,
scope_name: str = "",
scope_version: str = "",
scope_attributes: dict[str, str] = {},
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
self.tag_attributes = []
self.attribute_keys = []
self.resource_keys = []
# Convert timestamp to uint64 nanoseconds
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
self.observed_timestamp = self.timestamp
# Calculate ts_bucket_start (30mins bucket)
# Round down to nearest 30-minute interval
minute = timestamp.minute
if minute < 30:
bucket_minute = 0
else:
bucket_minute = 30
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
# Generate ksuid by using the timestamp
self.id = str(KsuidMs(datetime=timestamp))
# Initialize trace fields
self.trace_id = trace_id
self.span_id = span_id
self.trace_flags = trace_flags
# Set severity fields
self.severity_text = severity_text
self.severity_number = self._get_severity_number(severity_text)
# Set body
self.body = body
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
for k, v in self.resources_string.items():
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="resource",
tag_data_type="string",
string_value=v,
number_value=None,
)
)
self.resource_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="string")
)
# Calculate resource fingerprint
self.resource_fingerprint = LogsOrTracesFingerprint(
self.resources_string
).calculate()
# Process attributes by type
self.attributes_string = {}
self.attributes_number = {}
self.attributes_bool = {}
for k, v in attributes.items():
if isinstance(v, bool):
self.attributes_bool[k] = v
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="bool",
string_value=None,
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="bool")
)
elif isinstance(v, (int, float)):
self.attributes_number[k] = np.float64(v)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="float64",
string_value=None,
number_value=np.float64(v),
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="float64")
)
else:
self.attributes_string[k] = str(v)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="string",
string_value=str(v),
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="string")
)
# Initialize scope fields
self.scope_name = scope_name
self.scope_version = scope_version
self.scope_string = {k: str(v) for k, v in scope_attributes.items()}
for k, v in self.scope_string.items():
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="scope",
tag_data_type="string",
string_value=v,
number_value=None,
)
)
self.resource = []
self.resource.append(
LogsResource(
labels=self.resources_string,
fingerprint=self.resource_fingerprint,
seen_at_ts_bucket_start=self.ts_bucket_start,
)
)
# Log fields (severity)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key="severity_text",
tag_type="logfield",
tag_data_type="string",
string_value=self.severity_text,
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name="severity_text", datatype="string")
)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key="severity_number",
tag_type="logfield",
tag_data_type="float64",
string_value=None,
number_value=float(self.severity_number),
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name="severity_number", datatype="float64")
)
def _get_severity_number(self, severity_text: str) -> np.uint8:
"""Convert severity text to numeric value"""
severity_map = {
"TRACE": 1,
"DEBUG": 5,
"INFO": 9,
"WARN": 13,
"ERROR": 17,
"FATAL": 21,
}
return np.uint8(severity_map.get(severity_text.upper(), 9)) # Default to INFO
def np_arr(self) -> np.array:
"""Return log data as numpy array for database insertion"""
return np.array(
[
self.ts_bucket_start,
self.resource_fingerprint,
self.timestamp,
self.observed_timestamp,
self.id,
self.trace_id,
self.span_id,
self.trace_flags,
self.severity_text,
self.severity_number,
self.body,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
self.resources_string,
self.scope_name,
self.scope_version,
self.scope_string,
self.resources_string,
]
)
@classmethod
def from_dict(
cls,
data: dict,
) -> "Logs":
"""Create a Logs instance from a dict."""
# parse timestamp from iso format
timestamp = parse_timestamp(data["timestamp"])
return cls(
timestamp=timestamp,
resources=data.get("resources", {}),
attributes=data.get("attributes", {}),
body=data["body"],
severity_text=data.get("severity_text", "INFO"),
)
@classmethod
def load_from_file(
cls,
file_path: str,
base_time: Optional[datetime.datetime] = None,
) -> List["Logs"]:
"""Load logs from a JSONL file."""
data_list = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
data_list.append(json.loads(line))
# If base_time provided, calculate time offset
time_offset = datetime.timedelta(0)
if base_time is not None:
# Find earliest timestamp
earliest = None
for data in data_list:
ts = parse_timestamp(data["timestamp"])
if earliest is None or ts < earliest:
earliest = ts
if earliest is not None:
time_offset = base_time - earliest
logs = []
for data in data_list:
original_ts = parse_timestamp(data["timestamp"])
adjusted_ts = original_ts + time_offset
data["timestamp"] = adjusted_ts.isoformat()
logs.append(cls.from_dict(data))
return logs
@pytest.fixture(name="insert_logs", scope="function")
def insert_logs(
clickhouse: types.TestContainerClickhouse,
) -> Generator[Callable[[List[Logs]], None], Any, None]:
def _insert_logs(logs: List[Logs]) -> None:
"""
Insert logs into ClickHouse tables following the same logic as the Go exporter.
This function handles insertion into multiple tables:
- distributed_logs_v2 (main logs table)
- distributed_logs_v2_resource (resource fingerprints)
- distributed_tag_attributes_v2 (tag attributes)
- distributed_logs_attribute_keys (attribute keys)
- distributed_logs_resource_keys (resource keys)
"""
resources: List[LogsResource] = []
for log in logs:
resources.extend(log.resource)
if len(resources) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2_resource",
data=[resource.np_arr() for resource in resources],
column_names=[
"labels",
"fingerprint",
"seen_at_ts_bucket_start",
],
)
tag_attributes: List[LogsTagAttributes] = []
for log in logs:
tag_attributes.extend(log.tag_attributes)
if len(tag_attributes) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_tag_attributes_v2",
data=[tag_attribute.np_arr() for tag_attribute in tag_attributes],
)
attribute_keys: List[LogsResourceOrAttributeKeys] = []
for log in logs:
attribute_keys.extend(log.attribute_keys)
if len(attribute_keys) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_attribute_keys",
data=[attribute_key.np_arr() for attribute_key in attribute_keys],
)
resource_keys: List[LogsResourceOrAttributeKeys] = []
for log in logs:
resource_keys.extend(log.resource_keys)
if len(resource_keys) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_resource_keys",
data=[resource_key.np_arr() for resource_key in resource_keys],
)
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
)
yield _insert_logs
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.tag_attributes_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_attribute_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_resource_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
@pytest.fixture(name="ttl_legacy_logs_v2_table_setup", scope="function")
def ttl_legacy_logs_v2_table_setup(request, signoz: types.SigNoz):
"""
Fixture to setup and teardown legacy TTL test environment.
It renames existing logs tables to backup names and creates new empty tables for testing.
After the test, it restores the original tables.
"""
# Setup code
result = signoz.telemetrystore.conn.query(
f"RENAME TABLE signoz_logs.logs_v2 TO signoz_logs.logs_v2_backup ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
).result_rows
assert result is not None
# Add cleanup to restore original table
request.addfinalizer(
lambda: signoz.telemetrystore.conn.query(
f"RENAME TABLE signoz_logs.logs_v2_backup TO signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
)
)
# Create new test tables
result = signoz.telemetrystore.conn.query(
f"""CREATE TABLE signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
(
`id` String,
`timestamp` UInt64 CODEC(DoubleDelta, LZ4)
)
ENGINE = MergeTree()
ORDER BY id;"""
).result_rows
assert result is not None
# Add cleanup to drop test table
request.addfinalizer(
lambda: signoz.telemetrystore.conn.query(
f"DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
)
)
yield # Test runs here
@pytest.fixture(name="ttl_legacy_logs_v2_resource_table_setup", scope="function")
def ttl_legacy_logs_v2_resource_table_setup(request, signoz: types.SigNoz):
"""
Fixture to setup and teardown legacy TTL test environment.
It renames existing logs tables to backup names and creates new empty tables for testing.
After the test, it restores the original tables.
"""
# Setup code
result = signoz.telemetrystore.conn.query(
f"RENAME TABLE signoz_logs.logs_v2_resource TO signoz_logs.logs_v2_resource_backup ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
).result_rows
assert result is not None
# Add cleanup to restore original table
request.addfinalizer(
lambda: signoz.telemetrystore.conn.query(
f"RENAME TABLE signoz_logs.logs_v2_resource_backup TO signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
)
)
# Create new test tables
result = signoz.telemetrystore.conn.query(
f"""CREATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
(
`id` String,
`seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1))
)
ENGINE = MergeTree()
ORDER BY id;"""
).result_rows
assert result is not None
# Add cleanup to drop test table
request.addfinalizer(
lambda: signoz.telemetrystore.conn.query(
f"DROP TABLE IF EXISTS signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}';"
)
)
yield # Test runs here
@pytest.fixture(name="remove_logs_ttl_settings", scope="function")
def remove_logs_ttl_settings(signoz: types.SigNoz):
"""
Remove TTL settings from the specified logs table.
This function alters the table to drop any existing TTL configurations
and resets the _retention_days default value to 0.
"""
tables = [
"distributed_logs_v2",
"distributed_logs_v2_resource",
"logs_v2",
"logs_v2_resource",
"logs_attribute_keys",
"logs_resource_keys",
]
for table in tables:
try:
# Reset _retention_days and _retention_days_cold default values to 0 for tables that have these columns
if table in [
"logs_v2",
"logs_v2_resource",
"distributed_logs_v2",
"distributed_logs_v2_resource",
]:
reset_retention_query = f"""
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
MODIFY COLUMN _retention_days UInt16 DEFAULT 0
"""
signoz.telemetrystore.conn.query(reset_retention_query)
reset_retention_cold_query = f"""
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
MODIFY COLUMN _retention_days_cold UInt16 DEFAULT 0
"""
signoz.telemetrystore.conn.query(reset_retention_cold_query)
else:
alter_query = f"""
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
REMOVE TTL
"""
signoz.telemetrystore.conn.query(alter_query)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error removing TTL from table {table}: {e}")