Added integration tests for TTL methods (#9289)

This pull request refactors how TTL (Time-To-Live) settings are applied for logs, metrics, and traces in the ClickHouse reader service. The main change is the removal of the dedicated setTTLLogs method and the consolidation of TTL logic to only support metrics and traces. The code now routes TTL requests based on type, and logs TTL is no longer handled.
This commit is contained in:
Tushar Vats
2025-10-10 22:20:25 +05:30
committed by GitHub
parent 6c59b5405e
commit 3123447005
4 changed files with 713 additions and 248 deletions

View File

@@ -17,6 +17,7 @@ jobs:
- bootstrap
- auth
- querier
- ttl
sqlstore-provider:
- postgres
- sqlite

View File

@@ -1276,154 +1276,6 @@ func getLocalTableName(tableName string) string {
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
if hasCustomRetention {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("SetTTLV2 only supported")}
}
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing TTL")}
}
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
coldStorageDuration := -1
if len(params.ColdStorageVolume) > 0 {
coldStorageDuration = int(params.ToColdStorageDuration)
}
tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2}
// check if there is existing things to be done
for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
// TTL query for logs_v2 table
ttlLogsV2 := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableNameArray[0], r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
ttlLogsV2 += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
// TTL query for logs_v2_resource table
// adding 1800 as our bucket size is 1800 seconds
ttlLogsV2Resource := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
"INTERVAL %v SECOND DELETE", tableNameArray[1], r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
ttlLogsV2Resource += fmt.Sprintf(", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
"INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
ttlPayload := map[string]string{
tableNameArray[0]: ttlLogsV2,
tableNameArray[1]: ttlLogsV2Resource,
}
// set the ttl if nothing is pending/ no errors
go func(ttlPayload map[string]string) {
for tableName, query := range ttlPayload {
// https://github.com/SigNoz/signoz/issues/5470
// we will change ttl for only the new parts and not the old ones
query += " SETTINGS materialize_ttl_after_modify=0"
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil {
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
return
}
zap.L().Info("Executing TTL request: ", zap.String("request", query))
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, query); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
_, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -2043,6 +1895,19 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, orgID, 100)
switch params.Type {
case constants.TraceTTL:
return r.setTTLTraces(ctx, orgID, params)
case constants.MetricsTTL:
return r.setTTLMetrics(ctx, orgID, params)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
}
}
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
@@ -2051,95 +1916,69 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *mod
if len(params.ColdStorageVolume) > 0 {
coldStorageDuration = int(params.ToColdStorageDuration)
}
switch params.Type {
case constants.TraceTTL:
return r.setTTLTraces(ctx, orgID, params)
case constants.MetricsTTL:
tableNames := []string{
signozMetricDBName + "." + signozSampleLocalTableName,
signozMetricDBName + "." + signozSamplesAgg5mLocalTableName,
signozMetricDBName + "." + signozSamplesAgg30mLocalTableName,
signozMetricDBName + "." + signozExpHistLocalTableName,
signozMetricDBName + "." + signozTSLocalTableNameV4,
signozMetricDBName + "." + signozTSLocalTableNameV46Hrs,
signozMetricDBName + "." + signozTSLocalTableNameV41Day,
signozMetricDBName + "." + signozTSLocalTableNameV41Week,
tableNames := []string{
signozMetricDBName + "." + signozSampleLocalTableName,
signozMetricDBName + "." + signozSamplesAgg5mLocalTableName,
signozMetricDBName + "." + signozSamplesAgg30mLocalTableName,
signozMetricDBName + "." + signozExpHistLocalTableName,
signozMetricDBName + "." + signozTSLocalTableNameV4,
signozMetricDBName + "." + signozTSLocalTableNameV46Hrs,
signozMetricDBName + "." + signozTSLocalTableNameV41Day,
signozMetricDBName + "." + signozTSLocalTableNameV41Week,
}
for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
for _, tableName := range tableNames {
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
metricTTL := func(tableName string) {
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return
}
timeColumn := "timestamp_ms"
if strings.Contains(tableName, "v4") || strings.Contains(tableName, "exp_hist") {
timeColumn = "unix_milli"
}
req := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(%s / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, r.cluster, timeColumn, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(%s / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
timeColumn, params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
metricTTL := func(tableName string) {
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return
}
timeColumn := "timestamp_ms"
if strings.Contains(tableName, "v4") || strings.Contains(tableName, "exp_hist") {
timeColumn = "unix_milli"
}
req := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(%s / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, r.cluster, timeColumn, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(%s / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
timeColumn, params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil {
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
return
}
req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err))
if err == nil {
_, dbErr := r.
sqlDB.
BunDB().
@@ -2153,32 +1992,46 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *mod
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
_, dbErr = r.
return
}
req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err))
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
for _, tableName := range tableNames {
go metricTTL(tableName)
_, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
case constants.LogsTTL:
return r.setTTLLogs(ctx, orgID, params)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
}
for _, tableName := range tableNames {
go metricTTL(tableName)
}
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}

View File

@@ -29,7 +29,15 @@ class LogsResource(ABC):
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
def np_arr(self) -> np.array:
return np.array([self.labels, self.fingerprint, self.seen_at_ts_bucket_start, np.uint64(10),np.uint64(15)])
return np.array(
[
self.labels,
self.fingerprint,
self.seen_at_ts_bucket_start,
np.uint64(10),
np.uint64(15),
]
)
class LogsResourceOrAttributeKeys(ABC):
@@ -381,7 +389,7 @@ def insert_logs(
table="distributed_logs_resource_keys",
data=[resource_key.np_arr() for resource_key in resource_keys],
)
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2",

View File

@@ -0,0 +1,603 @@
"""
Summary:
This test file contains integration tests for Time-To-Live (TTL) and custom retention policies in SigNoz's query service.
It verifies the correct behavior of TTL settings for traces, metrics, and logs, including support for cold storage, custom retention conditions, error handling for invalid configurations, and retrieval of TTL settings.
"""
import time
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logger import setup_logger
from fixtures.logs import Logs
logger = setup_logger(__name__)
@pytest.fixture(name="ttl_test_suite_setup", scope="package", autouse=True)
def ttl_test_suite_setup(create_user_admin): # pylint: disable=unused-argument
# This fixture creates a admin user for the entire ttl test suite
# The create_user_admin fixture is executed just by being a dependency
print("Setting up ttl test suite")
yield
def test_set_ttl_traces_success(signoz: types.SigNoz, get_jwt_token):
"""Test setting TTL for traces with new ttlConfig structure."""
payload = {
"type": "traces",
"duration": "3600h",
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=payload,
headers=headers,
timeout=30,
)
print(response.text)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
assert "successfully set up" in response_data["message"].lower()
# Verify TTL settings in Clickhouse
# Allow some time for the TTL to be applied
time.sleep(2)
# Check TTL settings on relevant tables
tables_to_check = [
"signoz_index_v3",
"traces_v3_resource",
"signoz_error_index_v2",
"usage_explorer",
"dependency_graph_minutes_v2",
"trace_summary",
]
# Query to get table engine info which includes TTL
table_list = ", ".join(f"'{table}'" for table in tables_to_check)
query = f"SELECT engine_full FROM system.tables WHERE table in [{table_list}]"
result = signoz.telemetrystore.conn.query(query).result_rows
# Verify TTL exists in all table definitions
assert all("TTL" in r[0] for r in result)
assert all(" SETTINGS" in r[0] for r in result)
ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result]
# All TTLs should include toIntervalSecond(12960000) which is 3600h
assert all("toIntervalSecond(12960000)" in ttl_part for ttl_part in ttl_parts)
def test_set_ttl_traces_with_cold_storage(signoz: types.SigNoz, get_jwt_token):
"""Test setting TTL for traces with cold storage configuration."""
payload = {
"type": "traces",
"duration": f"{90*24}h", # 90 days in hours
"coldStorageVolume": "cold_storage_vol",
"toColdStorageDuration": f"{30*24}h", # 30 days in hours
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
assert "successfully set up" in response_data["message"].lower()
def test_set_ttl_metrics_success(signoz: types.SigNoz, get_jwt_token):
"""Test setting TTL for metrics using the new setTTLMetrics method."""
payload = {
"type": "metrics",
"duration": f"{90*24}h", # 90 days in hours
"coldStorageVolume": "",
"toColdStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
assert "successfully set up" in response_data["message"].lower()
# Verify TTL settings in Clickhouse
# Allow some time for the TTL to be applied
time.sleep(2)
# Check TTL settings on relevant metrics tables
tables_to_check = [
"samples_v4",
"samples_v4_agg_5m",
"samples_v4_agg_30m",
"time_series_v4",
"time_series_v4_6hrs",
"time_series_v4_1day",
"time_series_v4_1week",
]
# Query to get table engine info which includes TTL
table_list = "', '".join(tables_to_check)
query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']"
result = signoz.telemetrystore.conn.query(query).result_rows
# Verify TTL exists in all table definitions
assert all("TTL" in r[0] for r in result)
assert all(" SETTINGS" in r[0] for r in result)
ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result]
# All TTLs should include toIntervalSecond(7776000) which is 90*24h
assert all("toIntervalSecond(7776000)" in ttl_part for ttl_part in ttl_parts)
def test_set_ttl_metrics_with_cold_storage(signoz: types.SigNoz, get_jwt_token):
"""Test setting TTL for metrics with cold storage configuration."""
payload = {
"type": "metrics",
"duration": f"{90*24}h", # 90 days in hours
"coldStorageVolume": "metrics_cold_vol",
"toColdStorageDuration": f"{20*24}h", # 20 days in hours
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
assert "successfully set up" in response_data["message"].lower()
def test_set_ttl_invalid_type(signoz: types.SigNoz, get_jwt_token):
"""Test setting TTL with invalid type returns error."""
payload = {
"type": "invalid_type",
"duration": f"{90*24}h",
"coldStorageVolume": "",
"toColdStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.BAD_REQUEST
def test_set_custom_retention_ttl_basic(signoz: types.SigNoz, get_jwt_token):
"""Test setting custom retention TTL with basic configuration."""
payload = {
"type": "logs",
"defaultTTLDays": 100,
"ttlConditions": [],
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
# Verify TTL settings in Clickhouse
# Allow some time for the TTL to be applied
time.sleep(2)
# Check TTL settings on relevant tables
tables_to_check = [
"logs_v2",
"logs_v2_resource",
]
# Query to get table engine info which includes TTL
table_list = "', '".join(tables_to_check)
query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']"
result = signoz.telemetrystore.conn.query(query).result_rows
# Verify TTL exists in all table definitions
assert all("TTL" in r[0] for r in result)
assert all(" SETTINGS" in r[0] for r in result)
ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result]
# Also verify the TTL parts contain retention_days
assert all("_retention_days" in ttl_part for ttl_part in ttl_parts)
# Query to describe tables and check retention_days column
for table in tables_to_check:
describe_query = f"DESCRIBE TABLE signoz_logs.{table}"
describe_result = signoz.telemetrystore.conn.query(describe_query).result_rows
# Find the _retention_days column
retention_col = next(
(row for row in describe_result if row[0] == "_retention_days"), None
)
assert (
retention_col is not None
), f"_retention_days column not found in table {table}"
assert (
retention_col[1] == "UInt16"
), f"Expected _retention_days to be UInt16 in table {table}, but got {retention_col[1]}"
assert (
retention_col[3] == "100"
), f"Expected default value of _retention_days to be 100 in table {table}, but got {retention_col[3]}"
def test_set_custom_retention_ttl_with_conditions(
signoz: types.SigNoz, get_jwt_token, insert_logs
):
"""Test setting custom retention TTL with filter conditions."""
payload = {
"type": "logs",
"defaultTTLDays": 30,
"ttlConditions": [
{
"conditions": [
{"key": "service_name", "values": ["frontend", "backend"]}
],
"ttlDays": 60,
}
],
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.BAD_REQUEST
# Need to ensure that "severity" and "service_name" keys exist in logsAttributeKeys table
# Insert some logs with these attribute keys
logs = [
Logs(resources={"service_name": "frontend"}, severity_text="ERROR"),
Logs(resources={"service_name": "backend"}, severity_text="FATAL"),
]
insert_logs(logs)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data
def test_set_custom_retention_ttl_with_cold_storage(
signoz: types.SigNoz, get_jwt_token, insert_logs
):
"""Test setting custom retention TTL with cold storage configuration."""
payload = {
"type": "logs",
"defaultTTLDays": 60,
"ttlConditions": [
{
"conditions": [{"key": "environment", "values": ["production"]}],
"ttlDays": 180,
}
],
"coldStorageVolume": "logs_cold_storage",
"coldStorageDuration": 30, # 30 days to cold storage
}
# Insert some logs with these attribute keys
logs = [
Logs(resources={"environment": "production"}, severity_text="ERROR"),
]
insert_logs(logs)
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.BAD_REQUEST
response_data = response.json()
assert "error" in response_data
assert "message" in response_data["error"]
assert "Unknown storage policy `tiered`" in response_data["error"]["message"]
def test_set_custom_retention_ttl_duplicate_conditions(
signoz: types.SigNoz, get_jwt_token
):
"""Test that duplicate TTL conditions are rejected."""
payload = {
"type": "logs",
"defaultTTLDays": 30,
"ttlConditions": [
{
"conditions": [{"key": "service_name", "values": ["frontend"]}],
"ttlDays": 60,
},
{
"conditions": [
{
"key": "service_name",
"values": ["frontend"], # Duplicate condition
}
],
"ttlDays": 90,
},
],
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
# Should return error for duplicate conditions
assert response.status_code == HTTPStatus.BAD_REQUEST
def test_set_custom_retention_ttl_invalid_condition(
signoz: types.SigNoz, get_jwt_token
):
"""Test that conditions with empty values are rejected."""
payload = {
"type": "logs",
"defaultTTLDays": 30,
"ttlConditions": [
{
"conditions": [
{
"key": "service_name",
"values": [], # Empty values should be rejected
}
],
"ttlDays": 60,
}
],
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
# Should return error for empty condition values
assert response.status_code == HTTPStatus.BAD_REQUEST
def test_get_custom_retention_ttl(signoz: types.SigNoz, get_jwt_token, insert_logs):
"""Test getting custom retention TTL configuration."""
# First set a custom retention TTL
set_payload = {
"type": "logs",
"defaultTTLDays": 45,
"ttlConditions": [
{
"conditions": [{"key": "service_name", "values": ["test-service"]}],
"ttlDays": 90,
}
],
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
# Insert some logs with these attribute keys
logs = [
Logs(resources={"service_name": "test-service"}, severity_text="ERROR"),
]
insert_logs(logs)
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
set_response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=set_payload,
headers=headers,
timeout=30,
)
assert set_response.status_code == HTTPStatus.OK
# Allow some time for the TTL to be processed
time.sleep(2)
# Now get the TTL configuration
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
get_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
params={"type": "logs"},
headers=headers,
timeout=30,
)
response_data = get_response.json()
# Verify the response contains expected fields
assert response_data["status"] == "success"
assert response_data["default_ttl_days"] == 45
assert response_data["cold_storage_ttl_days"] == -1
assert response_data["ttl_conditions"][0]["ttlDays"] == 90
assert response_data["ttl_conditions"][0]["conditions"][0]["key"] == "service_name"
assert response_data["ttl_conditions"][0]["conditions"][0]["values"] == [
"test-service"
]
def test_get_ttl_traces_success(signoz: types.SigNoz, get_jwt_token):
"""Test getting TTL for traces."""
# First set a TTL configuration for traces
set_payload = {
"type": "traces",
"duration": "720h", # 30 days in hours
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
set_response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params=set_payload,
headers=headers,
timeout=30,
)
print(set_response.text)
assert set_response.status_code == HTTPStatus.OK
# Allow some time for the TTL to be processed
time.sleep(2)
# Now get the TTL configuration for traces
get_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"),
params={"type": "traces"},
headers=headers,
timeout=30,
)
assert get_response.status_code == HTTPStatus.OK
response_data = get_response.json()
# Verify the response contains expected fields and values
assert response_data["status"] == "success"
assert "traces_ttl_duration_hrs" in response_data
assert "traces_move_ttl_duration_hrs" in response_data
assert (
response_data["traces_ttl_duration_hrs"] == 720
) # Note: response is in hours as integer
assert (
response_data["traces_move_ttl_duration_hrs"] == -1
) # -1 indicates no cold storage configured
def test_large_ttl_conditions_list(signoz: types.SigNoz, get_jwt_token, insert_logs):
"""Test custom retention TTL with many conditions."""
# Create a list of many TTL conditions to test performance and limits
conditions = []
for i in range(10): # Test with 10 conditions
conditions.append(
{
"conditions": [{"key": "service_name", "values": [f"service-{i}"]}],
"ttlDays": 30 + (i * 10),
}
)
logs = [
Logs(resources={"service_name": f"service-{i}"}, severity_text="ERROR")
for i in range(10)
]
insert_logs(logs)
payload = {
"type": "logs",
"defaultTTLDays": 30,
"ttlConditions": conditions,
"coldStorageVolume": "",
"coldStorageDuration": 0,
}
headers = {
"Authorization": f"Bearer {get_jwt_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
}
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
json=payload,
headers=headers,
timeout=30,
)
assert response.status_code == HTTPStatus.OK
response_data = response.json()
assert "message" in response_data