mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* chore: fixture for notification channel * chore: return notification channel info in Create notification channel API * fix: change scope of create channel fixture to function level * test: added fixture for creating alert rule * chore: added debug message on assertion failure * refactor: improve error handling in webhook notification channel deletion * fix: enhance error handling in alert rule creation and deletion * chore: ran py linter and fmt * chore: ran py linter and fmt * fix: add timeout to alert rule creation and deletion requests * fix: silenced pylint on too broad exception * fix: suppress pylint warnings for broad exception handling in alert rule deletion * test: added fixture for inserting alert data * refactor: added fixture for getting test data file path * feat: add alerts to integration CI workflow * chore: linter fixes * chore: changed scope for get_testdata_file_path * chore: py-formatter * chore: py-formatter * chore: updated get_testdata_file_path fixture to a util function * chore: removed wrong ref --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
625 lines
22 KiB
Python
625 lines
22 KiB
Python
import datetime
|
|
import json
|
|
from abc import ABC
|
|
from typing import Any, Callable, Generator, List, Optional
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from ksuid import KsuidMs
|
|
|
|
from fixtures import types
|
|
from fixtures.fingerprint import LogsOrTracesFingerprint
|
|
from fixtures.utils import parse_timestamp
|
|
|
|
|
|
class LogsResource(ABC):
|
|
labels: str
|
|
fingerprint: str
|
|
seen_at_ts_bucket_start: np.int64
|
|
|
|
def __init__(
|
|
self,
|
|
labels: dict[str, str],
|
|
fingerprint: str,
|
|
seen_at_ts_bucket_start: np.int64,
|
|
) -> None:
|
|
self.labels = json.dumps(
|
|
labels, separators=(",", ":")
|
|
) # clickhouse treats {"a": "b"} differently from {"a":"b"}. In the first case it is not able to run json functions
|
|
self.fingerprint = fingerprint
|
|
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
|
|
|
|
def np_arr(self) -> np.array:
|
|
return np.array(
|
|
[
|
|
self.labels,
|
|
self.fingerprint,
|
|
self.seen_at_ts_bucket_start,
|
|
]
|
|
)
|
|
|
|
|
|
class LogsResourceOrAttributeKeys(ABC):
|
|
name: str
|
|
datatype: str
|
|
|
|
def __init__(self, name: str, datatype: str) -> None:
|
|
self.name = name
|
|
self.datatype = datatype
|
|
|
|
def np_arr(self) -> np.array:
|
|
return np.array([self.name, self.datatype])
|
|
|
|
|
|
class LogsTagAttributes(ABC):
|
|
unix_milli: np.int64
|
|
tag_key: str
|
|
tag_type: str
|
|
tag_data_type: str
|
|
string_value: str
|
|
number_value: np.float64
|
|
|
|
def __init__(
|
|
self,
|
|
timestamp: datetime.datetime,
|
|
tag_key: str,
|
|
tag_type: str,
|
|
tag_data_type: str,
|
|
string_value: Optional[str],
|
|
number_value: np.float64,
|
|
) -> None:
|
|
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
|
|
self.tag_key = tag_key
|
|
self.tag_type = tag_type
|
|
self.tag_data_type = tag_data_type
|
|
self.string_value = string_value or ""
|
|
self.number_value = number_value
|
|
|
|
def np_arr(self) -> np.array:
|
|
return np.array(
|
|
[
|
|
self.unix_milli,
|
|
self.tag_key,
|
|
self.tag_type,
|
|
self.tag_data_type,
|
|
self.string_value,
|
|
self.number_value,
|
|
]
|
|
)
|
|
|
|
|
|
class Logs(ABC):
|
|
ts_bucket_start: np.uint64
|
|
resource_fingerprint: str
|
|
timestamp: np.uint64
|
|
observed_timestamp: np.uint64
|
|
id: str
|
|
trace_id: str
|
|
span_id: str
|
|
trace_flags: np.uint32
|
|
severity_text: str
|
|
severity_number: np.uint8
|
|
body: str
|
|
attributes_string: dict[str, str]
|
|
attributes_number: dict[str, np.float64]
|
|
attributes_bool: dict[str, bool]
|
|
resources_string: dict[str, str]
|
|
scope_name: str
|
|
scope_version: str
|
|
scope_string: dict[str, str]
|
|
|
|
resource: List[LogsResource]
|
|
tag_attributes: List[LogsTagAttributes]
|
|
resource_keys: List[LogsResourceOrAttributeKeys]
|
|
attribute_keys: List[LogsResourceOrAttributeKeys]
|
|
|
|
def __init__(
|
|
self,
|
|
timestamp: Optional[datetime.datetime] = None,
|
|
resources: dict[str, Any] = {},
|
|
attributes: dict[str, Any] = {},
|
|
body: str = "default body",
|
|
severity_text: str = "INFO",
|
|
trace_id: str = "",
|
|
span_id: str = "",
|
|
trace_flags: np.uint32 = 0,
|
|
scope_name: str = "",
|
|
scope_version: str = "",
|
|
scope_attributes: dict[str, str] = {},
|
|
) -> None:
|
|
if timestamp is None:
|
|
timestamp = datetime.datetime.now()
|
|
self.tag_attributes = []
|
|
self.attribute_keys = []
|
|
self.resource_keys = []
|
|
|
|
# Convert timestamp to uint64 nanoseconds
|
|
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
|
|
self.observed_timestamp = self.timestamp
|
|
|
|
# Calculate ts_bucket_start (30mins bucket)
|
|
# Round down to nearest 30-minute interval
|
|
minute = timestamp.minute
|
|
if minute < 30:
|
|
bucket_minute = 0
|
|
else:
|
|
bucket_minute = 30
|
|
|
|
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
|
|
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
|
|
|
|
# Generate ksuid by using the timestamp
|
|
self.id = str(KsuidMs(datetime=timestamp))
|
|
|
|
# Initialize trace fields
|
|
self.trace_id = trace_id
|
|
self.span_id = span_id
|
|
self.trace_flags = trace_flags
|
|
|
|
# Set severity fields
|
|
self.severity_text = severity_text
|
|
self.severity_number = self._get_severity_number(severity_text)
|
|
|
|
# Set body
|
|
self.body = body
|
|
|
|
# Process resources and attributes
|
|
self.resources_string = {k: str(v) for k, v in resources.items()}
|
|
for k, v in self.resources_string.items():
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key=k,
|
|
tag_type="resource",
|
|
tag_data_type="string",
|
|
string_value=v,
|
|
number_value=None,
|
|
)
|
|
)
|
|
self.resource_keys.append(
|
|
LogsResourceOrAttributeKeys(name=k, datatype="string")
|
|
)
|
|
|
|
# Calculate resource fingerprint
|
|
self.resource_fingerprint = LogsOrTracesFingerprint(
|
|
self.resources_string
|
|
).calculate()
|
|
|
|
# Process attributes by type
|
|
self.attributes_string = {}
|
|
self.attributes_number = {}
|
|
self.attributes_bool = {}
|
|
|
|
for k, v in attributes.items():
|
|
if isinstance(v, bool):
|
|
self.attributes_bool[k] = v
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key=k,
|
|
tag_type="tag",
|
|
tag_data_type="bool",
|
|
string_value=None,
|
|
number_value=None,
|
|
)
|
|
)
|
|
self.attribute_keys.append(
|
|
LogsResourceOrAttributeKeys(name=k, datatype="bool")
|
|
)
|
|
elif isinstance(v, (int, float)):
|
|
self.attributes_number[k] = np.float64(v)
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key=k,
|
|
tag_type="tag",
|
|
tag_data_type="float64",
|
|
string_value=None,
|
|
number_value=np.float64(v),
|
|
)
|
|
)
|
|
self.attribute_keys.append(
|
|
LogsResourceOrAttributeKeys(name=k, datatype="float64")
|
|
)
|
|
else:
|
|
self.attributes_string[k] = str(v)
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key=k,
|
|
tag_type="tag",
|
|
tag_data_type="string",
|
|
string_value=str(v),
|
|
number_value=None,
|
|
)
|
|
)
|
|
self.attribute_keys.append(
|
|
LogsResourceOrAttributeKeys(name=k, datatype="string")
|
|
)
|
|
|
|
# Initialize scope fields
|
|
self.scope_name = scope_name
|
|
self.scope_version = scope_version
|
|
self.scope_string = {k: str(v) for k, v in scope_attributes.items()}
|
|
for k, v in self.scope_string.items():
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key=k,
|
|
tag_type="scope",
|
|
tag_data_type="string",
|
|
string_value=v,
|
|
number_value=None,
|
|
)
|
|
)
|
|
|
|
self.resource = []
|
|
self.resource.append(
|
|
LogsResource(
|
|
labels=self.resources_string,
|
|
fingerprint=self.resource_fingerprint,
|
|
seen_at_ts_bucket_start=self.ts_bucket_start,
|
|
)
|
|
)
|
|
|
|
# Log fields (severity)
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key="severity_text",
|
|
tag_type="logfield",
|
|
tag_data_type="string",
|
|
string_value=self.severity_text,
|
|
number_value=None,
|
|
)
|
|
)
|
|
self.attribute_keys.append(
|
|
LogsResourceOrAttributeKeys(name="severity_text", datatype="string")
|
|
)
|
|
|
|
self.tag_attributes.append(
|
|
LogsTagAttributes(
|
|
timestamp=timestamp,
|
|
tag_key="severity_number",
|
|
tag_type="logfield",
|
|
tag_data_type="float64",
|
|
string_value=None,
|
|
number_value=float(self.severity_number),
|
|
)
|
|
)
|
|
self.attribute_keys.append(
|
|
LogsResourceOrAttributeKeys(name="severity_number", datatype="float64")
|
|
)
|
|
|
|
def _get_severity_number(self, severity_text: str) -> np.uint8:
|
|
"""Convert severity text to numeric value"""
|
|
severity_map = {
|
|
"TRACE": 1,
|
|
"DEBUG": 5,
|
|
"INFO": 9,
|
|
"WARN": 13,
|
|
"ERROR": 17,
|
|
"FATAL": 21,
|
|
}
|
|
|
|
return np.uint8(severity_map.get(severity_text.upper(), 9)) # Default to INFO
|
|
|
|
def np_arr(self) -> np.array:
|
|
"""Return log data as numpy array for database insertion"""
|
|
return np.array(
|
|
[
|
|
self.ts_bucket_start,
|
|
self.resource_fingerprint,
|
|
self.timestamp,
|
|
self.observed_timestamp,
|
|
self.id,
|
|
self.trace_id,
|
|
self.span_id,
|
|
self.trace_flags,
|
|
self.severity_text,
|
|
self.severity_number,
|
|
self.body,
|
|
self.attributes_string,
|
|
self.attributes_number,
|
|
self.attributes_bool,
|
|
self.resources_string,
|
|
self.scope_name,
|
|
self.scope_version,
|
|
self.scope_string,
|
|
self.resources_string,
|
|
]
|
|
)
|
|
|
|
@classmethod
|
|
def from_dict(
|
|
cls,
|
|
data: dict,
|
|
) -> "Logs":
|
|
"""Create a Logs instance from a dict."""
|
|
# parse timestamp from iso format
|
|
timestamp = parse_timestamp(data["timestamp"])
|
|
return cls(
|
|
timestamp=timestamp,
|
|
resources=data.get("resources", {}),
|
|
attributes=data.get("attributes", {}),
|
|
body=data["body"],
|
|
severity_text=data.get("severity_text", "INFO"),
|
|
)
|
|
|
|
@classmethod
|
|
def load_from_file(
|
|
cls,
|
|
file_path: str,
|
|
base_time: Optional[datetime.datetime] = None,
|
|
) -> List["Logs"]:
|
|
"""Load logs from a JSONL file."""
|
|
|
|
data_list = []
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
data_list.append(json.loads(line))
|
|
|
|
# If base_time provided, calculate time offset
|
|
time_offset = datetime.timedelta(0)
|
|
if base_time is not None:
|
|
# Find earliest timestamp
|
|
earliest = None
|
|
for data in data_list:
|
|
ts = parse_timestamp(data["timestamp"])
|
|
if earliest is None or ts < earliest:
|
|
earliest = ts
|
|
if earliest is not None:
|
|
time_offset = base_time - earliest
|
|
|
|
logs = []
|
|
for data in data_list:
|
|
original_ts = parse_timestamp(data["timestamp"])
|
|
adjusted_ts = original_ts + time_offset
|
|
data["timestamp"] = adjusted_ts.isoformat()
|
|
logs.append(cls.from_dict(data))
|
|
|
|
return logs
|
|
|
|
|
|
@pytest.fixture(name="insert_logs", scope="function")
|
|
def insert_logs(
|
|
clickhouse: types.TestContainerClickhouse,
|
|
) -> Generator[Callable[[List[Logs]], None], Any, None]:
|
|
def _insert_logs(logs: List[Logs]) -> None:
|
|
"""
|
|
Insert logs into ClickHouse tables following the same logic as the Go exporter.
|
|
This function handles insertion into multiple tables:
|
|
- distributed_logs_v2 (main logs table)
|
|
- distributed_logs_v2_resource (resource fingerprints)
|
|
- distributed_tag_attributes_v2 (tag attributes)
|
|
- distributed_logs_attribute_keys (attribute keys)
|
|
- distributed_logs_resource_keys (resource keys)
|
|
"""
|
|
resources: List[LogsResource] = []
|
|
for log in logs:
|
|
resources.extend(log.resource)
|
|
|
|
if len(resources) > 0:
|
|
clickhouse.conn.insert(
|
|
database="signoz_logs",
|
|
table="distributed_logs_v2_resource",
|
|
data=[resource.np_arr() for resource in resources],
|
|
column_names=[
|
|
"labels",
|
|
"fingerprint",
|
|
"seen_at_ts_bucket_start",
|
|
],
|
|
)
|
|
|
|
tag_attributes: List[LogsTagAttributes] = []
|
|
for log in logs:
|
|
tag_attributes.extend(log.tag_attributes)
|
|
|
|
if len(tag_attributes) > 0:
|
|
clickhouse.conn.insert(
|
|
database="signoz_logs",
|
|
table="distributed_tag_attributes_v2",
|
|
data=[tag_attribute.np_arr() for tag_attribute in tag_attributes],
|
|
)
|
|
|
|
attribute_keys: List[LogsResourceOrAttributeKeys] = []
|
|
for log in logs:
|
|
attribute_keys.extend(log.attribute_keys)
|
|
|
|
if len(attribute_keys) > 0:
|
|
clickhouse.conn.insert(
|
|
database="signoz_logs",
|
|
table="distributed_logs_attribute_keys",
|
|
data=[attribute_key.np_arr() for attribute_key in attribute_keys],
|
|
)
|
|
|
|
resource_keys: List[LogsResourceOrAttributeKeys] = []
|
|
for log in logs:
|
|
resource_keys.extend(log.resource_keys)
|
|
|
|
if len(resource_keys) > 0:
|
|
clickhouse.conn.insert(
|
|
database="signoz_logs",
|
|
table="distributed_logs_resource_keys",
|
|
data=[resource_key.np_arr() for resource_key in resource_keys],
|
|
)
|
|
|
|
clickhouse.conn.insert(
|
|
database="signoz_logs",
|
|
table="distributed_logs_v2",
|
|
data=[log.np_arr() for log in logs],
|
|
column_names=[
|
|
"ts_bucket_start",
|
|
"resource_fingerprint",
|
|
"timestamp",
|
|
"observed_timestamp",
|
|
"id",
|
|
"trace_id",
|
|
"span_id",
|
|
"trace_flags",
|
|
"severity_text",
|
|
"severity_number",
|
|
"body",
|
|
"attributes_string",
|
|
"attributes_number",
|
|
"attributes_bool",
|
|
"resources_string",
|
|
"scope_name",
|
|
"scope_version",
|
|
"scope_string",
|
|
"resource",
|
|
],
|
|
)
|
|
|
|
yield _insert_logs
|
|
|
|
clickhouse.conn.query(
|
|
f"TRUNCATE TABLE signoz_logs.logs_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
)
|
|
clickhouse.conn.query(
|
|
f"TRUNCATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
)
|
|
clickhouse.conn.query(
|
|
f"TRUNCATE TABLE signoz_logs.tag_attributes_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
)
|
|
clickhouse.conn.query(
|
|
f"TRUNCATE TABLE signoz_logs.logs_attribute_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
)
|
|
clickhouse.conn.query(
|
|
f"TRUNCATE TABLE signoz_logs.logs_resource_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
)
|
|
|
|
|
|
@pytest.fixture(name="ttl_legacy_logs_v2_table_setup", scope="function")
|
|
def ttl_legacy_logs_v2_table_setup(request, signoz: types.SigNoz):
|
|
"""
|
|
Fixture to setup and teardown legacy TTL test environment.
|
|
It renames existing logs tables to backup names and creates new empty tables for testing.
|
|
After the test, it restores the original tables.
|
|
"""
|
|
|
|
# Setup code
|
|
result = signoz.telemetrystore.conn.query(
|
|
f"RENAME TABLE signoz_logs.logs_v2 TO signoz_logs.logs_v2_backup ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
|
|
).result_rows
|
|
assert result is not None
|
|
# Add cleanup to restore original table
|
|
request.addfinalizer(
|
|
lambda: signoz.telemetrystore.conn.query(
|
|
f"RENAME TABLE signoz_logs.logs_v2_backup TO signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
|
|
)
|
|
)
|
|
|
|
# Create new test tables
|
|
result = signoz.telemetrystore.conn.query(
|
|
f"""CREATE TABLE signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
|
|
(
|
|
`id` String,
|
|
`timestamp` UInt64 CODEC(DoubleDelta, LZ4)
|
|
|
|
)
|
|
ENGINE = MergeTree()
|
|
ORDER BY id;"""
|
|
).result_rows
|
|
|
|
assert result is not None
|
|
# Add cleanup to drop test table
|
|
request.addfinalizer(
|
|
lambda: signoz.telemetrystore.conn.query(
|
|
f"DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
|
|
)
|
|
)
|
|
|
|
yield # Test runs here
|
|
|
|
|
|
@pytest.fixture(name="ttl_legacy_logs_v2_resource_table_setup", scope="function")
|
|
def ttl_legacy_logs_v2_resource_table_setup(request, signoz: types.SigNoz):
|
|
"""
|
|
Fixture to setup and teardown legacy TTL test environment.
|
|
It renames existing logs tables to backup names and creates new empty tables for testing.
|
|
After the test, it restores the original tables.
|
|
"""
|
|
|
|
# Setup code
|
|
result = signoz.telemetrystore.conn.query(
|
|
f"RENAME TABLE signoz_logs.logs_v2_resource TO signoz_logs.logs_v2_resource_backup ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
|
|
).result_rows
|
|
assert result is not None
|
|
# Add cleanup to restore original table
|
|
request.addfinalizer(
|
|
lambda: signoz.telemetrystore.conn.query(
|
|
f"RENAME TABLE signoz_logs.logs_v2_resource_backup TO signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'"
|
|
)
|
|
)
|
|
|
|
# Create new test tables
|
|
result = signoz.telemetrystore.conn.query(
|
|
f"""CREATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
|
|
(
|
|
`id` String,
|
|
`seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1))
|
|
)
|
|
ENGINE = MergeTree()
|
|
ORDER BY id;"""
|
|
).result_rows
|
|
|
|
assert result is not None
|
|
# Add cleanup to drop test table
|
|
request.addfinalizer(
|
|
lambda: signoz.telemetrystore.conn.query(
|
|
f"DROP TABLE IF EXISTS signoz_logs.logs_v2_resource ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}';"
|
|
)
|
|
)
|
|
|
|
yield # Test runs here
|
|
|
|
|
|
@pytest.fixture(name="remove_logs_ttl_settings", scope="function")
|
|
def remove_logs_ttl_settings(signoz: types.SigNoz):
|
|
"""
|
|
Remove TTL settings from the specified logs table.
|
|
This function alters the table to drop any existing TTL configurations
|
|
and resets the _retention_days default value to 0.
|
|
"""
|
|
tables = [
|
|
"distributed_logs_v2",
|
|
"distributed_logs_v2_resource",
|
|
"logs_v2",
|
|
"logs_v2_resource",
|
|
"logs_attribute_keys",
|
|
"logs_resource_keys",
|
|
]
|
|
for table in tables:
|
|
|
|
try:
|
|
# Reset _retention_days and _retention_days_cold default values to 0 for tables that have these columns
|
|
if table in [
|
|
"logs_v2",
|
|
"logs_v2_resource",
|
|
"distributed_logs_v2",
|
|
"distributed_logs_v2_resource",
|
|
]:
|
|
reset_retention_query = f"""
|
|
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
|
|
MODIFY COLUMN _retention_days UInt16 DEFAULT 0
|
|
"""
|
|
signoz.telemetrystore.conn.query(reset_retention_query)
|
|
|
|
reset_retention_cold_query = f"""
|
|
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
|
|
MODIFY COLUMN _retention_days_cold UInt16 DEFAULT 0
|
|
"""
|
|
signoz.telemetrystore.conn.query(reset_retention_cold_query)
|
|
else:
|
|
alter_query = f"""
|
|
ALTER TABLE signoz_logs.{table} ON CLUSTER '{signoz.telemetrystore.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}'
|
|
REMOVE TTL
|
|
"""
|
|
signoz.telemetrystore.conn.query(alter_query)
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
print(f"Error removing TTL from table {table}: {e}")
|