Compare commits

...

7 Commits

Author SHA1 Message Date
Karan Balani
a94f5b1cab chore: remove register api route tests as we moved to root user 2026-02-17 00:34:22 +05:30
Karan Balani
f1998e24be chore: fix tests 2026-02-17 00:08:37 +05:30
Karan Balani
e667b904d5 chore: ref editor user values 2026-02-16 22:37:48 +05:30
Karan Balani
2f6cd523d6 chore: fix lint 2026-02-16 22:34:48 +05:30
Karan Balani
fca4118151 chore: use root user email and password variables 2026-02-16 22:31:22 +05:30
Karan Balani
f07f62975b chore: fix use of direct admin emails to variables 2026-02-16 22:28:26 +05:30
Karan Balani
fcb668687f fix: integation tests for root user 2026-02-16 22:03:17 +05:30
35 changed files with 358 additions and 482 deletions

View File

@@ -1,4 +1,4 @@
FROM node:18-bullseye AS build
FROM node:22-bookworm AS build
WORKDIR /opt/
COPY ./frontend/ ./

View File

@@ -6,7 +6,7 @@ import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
from fixtures.logs import Logs
from fixtures.metrics import Metrics
@@ -20,7 +20,7 @@ logger = setup_logger(__name__)
def create_alert_rule(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> Callable[[dict], str]:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
rule_ids = []

View File

@@ -11,55 +11,56 @@ from wiremock.resources.mappings import (
WireMockMatchers,
)
from fixtures import dev, types
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
logger = setup_logger(__name__)
USER_ADMIN_NAME = "admin"
USER_ADMIN_EMAIL = "admin@integration.test"
USER_ADMIN_PASSWORD = "password123Z$"
# USER_ADMIN_NAME = "admin"
# USER_ADMIN_EMAIL = "admin@integration.test"
# USER_ADMIN_PASSWORD = "password123Z$"
USER_EDITOR_NAME = "editor"
USER_EDITOR_EMAIL = "editor@integration.test"
USER_EDITOR_PASSWORD = "password123Z$"
@pytest.fixture(name="create_user_admin", scope="package")
def create_user_admin(
signoz: types.SigNoz, request: pytest.FixtureRequest, pytestconfig: pytest.Config
) -> types.Operation:
def create() -> None:
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/register"),
json={
"name": USER_ADMIN_NAME,
"orgName": "",
"email": USER_ADMIN_EMAIL,
"password": USER_ADMIN_PASSWORD,
},
timeout=5,
)
# @pytest.fixture(name="create_user_admin", scope="package")
# def create_user_admin(
# signoz: types.SigNoz, request: pytest.FixtureRequest, pytestconfig: pytest.Config
# ) -> types.Operation:
# def create() -> None:
# response = requests.post(
# signoz.self.host_configs["8080"].get("/api/v1/register"),
# json={
# "name": USER_ADMIN_NAME,
# "orgName": "",
# "email": USER_ADMIN_EMAIL,
# "password": USER_ADMIN_PASSWORD,
# },
# timeout=5,
# )
assert response.status_code == HTTPStatus.OK
# assert response.status_code == HTTPStatus.OK
return types.Operation(name="create_user_admin")
# return types.Operation(name="create_user_admin")
def delete(_: types.Operation) -> None:
pass
# def delete(_: types.Operation) -> None:
# pass
def restore(cache: dict) -> types.Operation:
return types.Operation(name=cache["name"])
# def restore(cache: dict) -> types.Operation:
# return types.Operation(name=cache["name"])
return dev.wrap(
request,
pytestconfig,
"create_user_admin",
lambda: types.Operation(name=""),
create,
delete,
restore,
)
# return dev.wrap(
# request,
# pytestconfig,
# "create_user_admin",
# lambda: types.Operation(name=""),
# create,
# delete,
# restore,
# )
@pytest.fixture(name="get_session_context", scope="function")
@@ -189,7 +190,7 @@ def add_license(
],
)
access_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
access_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
url=signoz.self.host_configs["8080"].get("/api/v3/licenses"),

View File

@@ -7,7 +7,7 @@ import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -73,7 +73,7 @@ def create_cloud_integration_account(
if created_account_id and cloud_provider_used:
get_token = request.getfixturevalue("get_token")
try:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
r = _disconnect(admin_token, cloud_provider_used)
if r.status_code != HTTPStatus.OK:
logger.info(

View File

@@ -9,7 +9,7 @@ from testcontainers.core.container import Network
from wiremock.testing.testcontainer import WireMockContainer
from fixtures import dev, types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -74,10 +74,9 @@ def notification_channel(
@pytest.fixture(name="create_webhook_notification_channel", scope="function")
def create_webhook_notification_channel(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> Callable[[str, str, dict, bool], str]:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# function to create notification channel
def _create_webhook_notification_channel(

View File

@@ -15,6 +15,9 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
ROOT_USER_EMAIL = "root@integration.test"
ROOT_USER_PASSWORD = "Str0ngP@ssw0rd!"
@pytest.fixture(name="signoz", scope="package")
def signoz( # pylint: disable=too-many-arguments,too-many-positional-arguments
@@ -73,6 +76,9 @@ def signoz( # pylint: disable=too-many-arguments,too-many-positional-arguments
"SIGNOZ_ALERTMANAGER_SIGNOZ_POLL__INTERVAL": "5s",
"SIGNOZ_ALERTMANAGER_SIGNOZ_ROUTE_GROUP__WAIT": "1s",
"SIGNOZ_ALERTMANAGER_SIGNOZ_ROUTE_GROUP__INTERVAL": "5s",
"SIGNOZ_USER_ROOT_ENABLED": True,
"SIGNOZ_USER_ROOT_EMAIL": ROOT_USER_EMAIL,
"SIGNOZ_USER_ROOT_PASSWORD": ROOT_USER_PASSWORD,
}
| sqlstore.env
| clickhouse.env

View File

@@ -7,7 +7,7 @@ import requests
from wiremock.client import HttpMethods, Mapping, MappingRequest, MappingResponse
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -65,7 +65,7 @@ def test_webhook_notification_channel(
time.sleep(10)
# Call test API for the notification channel
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
url=signoz.self.host_configs["8080"].get("/api/v1/testChannel"),
json={

View File

@@ -35,7 +35,6 @@ def test_telemetry_databases_exist(signoz: types.SigNoz) -> None:
def test_teardown(
signoz: types.SigNoz, # pylint: disable=unused-argument
idp: types.TestContainerIDP, # pylint: disable=unused-argument
create_user_admin: types.Operation, # pylint: disable=unused-argument
migrator: types.Operation, # pylint: disable=unused-argument
) -> None:
pass

View File

@@ -3,16 +3,15 @@ from typing import Callable
import requests
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.types import Operation, SigNoz
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.types import SigNoz
def test_create_and_get_domain(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get domains which should be an empty list
response = requests.get(
@@ -91,10 +90,9 @@ def test_create_and_get_domain(
def test_create_invalid(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a domain with type saml and body for oidc, this should fail because oidcConfig is not allowed for saml
response = requests.post(
@@ -173,11 +171,10 @@ def test_create_invalid(
def test_create_invalid_role_mapping(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
"""Test that invalid role mappings are rejected."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create domain with invalid defaultRole
response = requests.post(

View File

@@ -6,22 +6,18 @@ import requests
from selenium import webdriver
from wiremock.resources.mappings import Mapping
from fixtures.auth import (
USER_ADMIN_EMAIL,
USER_ADMIN_PASSWORD,
add_license,
)
from fixtures.auth import add_license
from fixtures.idputils import (
get_saml_domain,
get_user_by_email,
perform_saml_login,
)
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
from fixtures.types import SigNoz, TestContainerDocker, TestContainerIDP
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_apply_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[TestContainerDocker, List[Mapping]], None],
get_token: Callable[[str, str], str],
) -> None:
@@ -34,7 +30,6 @@ def test_create_auth_domain(
create_saml_client: Callable[[str, str], None],
update_saml_client_attributes: Callable[[str, Dict[str, Any]], None],
get_saml_settings: Callable[[], dict],
create_user_admin: Callable[[], None], # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
# Create a saml client in the idp.
@@ -44,7 +39,7 @@ def test_create_auth_domain(
settings = get_saml_settings()
# Create a auth domain in signoz.
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/domains"),
@@ -127,7 +122,7 @@ def test_saml_authn(
driver.get(url)
idp_login("viewer@saml.integration.test", "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
@@ -178,7 +173,7 @@ def test_idp_initiated_saml_authn(
driver.get(idp_initiated_login_url)
idp_login("viewer.idp.initiated@saml.integration.test", "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
@@ -208,7 +203,7 @@ def test_saml_update_domain_with_group_mappings(
get_token: Callable[[str, str], str],
get_saml_settings: Callable[[], dict],
) -> None:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
domain = get_saml_domain(signoz, admin_token)
settings = get_saml_settings()
@@ -266,7 +261,7 @@ def test_saml_role_mapping_single_group_admin(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -292,7 +287,7 @@ def test_saml_role_mapping_single_group_editor(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -322,7 +317,7 @@ def test_saml_role_mapping_multiple_groups_highest_wins(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -349,7 +344,7 @@ def test_saml_role_mapping_explicit_viewer_group(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -375,7 +370,7 @@ def test_saml_role_mapping_unmapped_group_uses_default(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -390,7 +385,7 @@ def test_saml_update_domain_with_use_role_claim(
"""
Updates SAML domain to enable useRoleAttribute (direct role attribute).
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
domain = get_saml_domain(signoz, admin_token)
settings = get_saml_settings()
@@ -452,7 +447,7 @@ def test_saml_role_mapping_role_claim_takes_precedence(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -482,7 +477,7 @@ def test_saml_role_mapping_invalid_role_claim_fallback(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -512,7 +507,7 @@ def test_saml_role_mapping_case_insensitive(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -537,7 +532,7 @@ def test_saml_name_mapping(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -565,7 +560,7 @@ def test_saml_empty_name_fallback(
signoz, driver, get_session_context, idp_login, email, "password"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None

View File

@@ -6,22 +6,18 @@ import requests
from selenium import webdriver
from wiremock.resources.mappings import Mapping
from fixtures.auth import (
USER_ADMIN_EMAIL,
USER_ADMIN_PASSWORD,
add_license,
)
from fixtures.auth import add_license
from fixtures.idputils import (
get_oidc_domain,
get_user_by_email,
perform_oidc_login,
)
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
from fixtures.types import SigNoz, TestContainerDocker, TestContainerIDP
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_apply_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[TestContainerDocker, List[Mapping]], None],
get_token: Callable[[str, str], str],
) -> None:
@@ -36,7 +32,6 @@ def test_create_auth_domain(
idp: TestContainerIDP, # pylint: disable=unused-argument
create_oidc_client: Callable[[str, str], None],
get_oidc_settings: Callable[[], dict],
create_user_admin: Callable[[], None], # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -50,7 +45,7 @@ def test_create_auth_domain(
settings = get_oidc_settings(client_id)
# Create a auth domain in signoz.
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/domains"),
@@ -109,7 +104,7 @@ def test_oidc_authn(
driver.get(actual_url)
idp_login("viewer@oidc.integration.test", "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
@@ -143,7 +138,7 @@ def test_oidc_update_domain_with_group_mappings(
"""
Updates OIDC domain to add role mapping with group mappings and claim mapping.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
domain = get_oidc_domain(signoz, admin_token)
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
settings = get_oidc_settings(client_id)
@@ -204,7 +199,7 @@ def test_oidc_role_mapping_single_group_admin(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -230,7 +225,7 @@ def test_oidc_role_mapping_single_group_editor(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -260,7 +255,7 @@ def test_oidc_role_mapping_multiple_groups_highest_wins(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -287,7 +282,7 @@ def test_oidc_role_mapping_explicit_viewer_group(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -313,7 +308,7 @@ def test_oidc_role_mapping_unmapped_group_uses_default(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -329,7 +324,7 @@ def test_oidc_update_domain_with_use_role_claim(
"""
Updates OIDC domain to enable useRoleClaim.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
domain = get_oidc_domain(signoz, admin_token)
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
settings = get_oidc_settings(client_id)
@@ -393,7 +388,7 @@ def test_oidc_role_mapping_role_claim_takes_precedence(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -425,7 +420,7 @@ def test_oidc_role_mapping_invalid_role_claim_fallback(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -455,7 +450,7 @@ def test_oidc_role_mapping_case_insensitive(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
@@ -481,7 +476,7 @@ def test_oidc_name_mapping(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
headers={"Authorization": f"Bearer {admin_token}"},
@@ -517,7 +512,7 @@ def test_oidc_empty_name_uses_fallback(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
headers={"Authorization": f"Bearer {admin_token}"},

View File

@@ -11,21 +11,21 @@ from wiremock.client import (
)
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD, add_license
from fixtures.auth import add_license
from fixtures.logger import setup_logger
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
logger = setup_logger(__name__)
def test_generate_connection_params(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[types.TestContainerDocker, list], None],
get_token: Callable[[str, str], str],
) -> None:
"""Test to generate connection parameters for AWS SigNoz cloud integration."""
# Get authentication token for admin user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
add_license(signoz, make_http_mocks, get_token)

View File

@@ -4,7 +4,7 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -12,13 +12,12 @@ logger = setup_logger(__name__)
def test_generate_connection_url(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test to generate connection URL for AWS CloudFormation stack deployment."""
# Get authentication token for admin user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/generate-connection-url"
@@ -101,12 +100,11 @@ def test_generate_connection_url(
def test_generate_connection_url_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test that unsupported cloud providers return an error."""
# Get authentication token for admin user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Try with GCP (unsupported)
cloud_provider = "gcp"

View File

@@ -5,7 +5,7 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.cloudintegrationsutils import simulate_agent_checkin
from fixtures.logger import setup_logger
@@ -14,11 +14,10 @@ logger = setup_logger(__name__)
def test_list_connected_accounts_empty(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing connected accounts when there are none."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
@@ -43,12 +42,11 @@ def test_list_connected_accounts_empty(
def test_list_connected_accounts_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test listing connected accounts after creating one."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account
cloud_provider = "aws"
@@ -88,12 +86,11 @@ def test_list_connected_accounts_with_account(
def test_get_account_status(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test getting the status of a specific account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account (no check-in needed for status check)
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
@@ -123,11 +120,10 @@ def test_get_account_status(
def test_get_account_status_not_found(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting status for a non-existent account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
fake_account_id = "00000000-0000-0000-0000-000000000000"
@@ -147,12 +143,11 @@ def test_get_account_status_not_found(
def test_update_account_config(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating account configuration."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account
cloud_provider = "aws"
@@ -212,12 +207,11 @@ def test_update_account_config(
def test_disconnect_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test disconnecting an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account
cloud_provider = "aws"
@@ -267,11 +261,10 @@ def test_disconnect_account(
def test_disconnect_account_not_found(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test disconnecting a non-existent account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
fake_account_id = "00000000-0000-0000-0000-000000000000"
@@ -291,12 +284,11 @@ def test_disconnect_account_not_found(
def test_list_accounts_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing accounts for an unsupported cloud provider."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "gcp" # Unsupported provider
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"

View File

@@ -5,7 +5,7 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.cloudintegrationsutils import simulate_agent_checkin
from fixtures.logger import setup_logger
@@ -14,11 +14,10 @@ logger = setup_logger(__name__)
def test_list_services_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing available services without specifying an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
@@ -48,12 +47,11 @@ def test_list_services_without_account(
def test_list_services_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test listing services for a specific connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
@@ -93,11 +91,10 @@ def test_list_services_with_account(
def test_get_service_details_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting service details without specifying an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
# First get the list of services to get a valid service ID
@@ -139,12 +136,11 @@ def test_get_service_details_without_account(
def test_get_service_details_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test getting service details for a specific connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
@@ -195,11 +191,10 @@ def test_get_service_details_with_account(
def test_get_service_details_invalid_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting details for a non-existent service."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
fake_service_id = "non-existent-service"
@@ -218,11 +213,10 @@ def test_get_service_details_invalid_service(
def test_list_services_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing services for an unsupported cloud provider."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "gcp" # Unsupported provider
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
@@ -240,12 +234,11 @@ def test_list_services_unsupported_provider(
def test_update_service_config(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating service configuration for a connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
@@ -306,11 +299,10 @@ def test_update_service_config(
def test_update_service_config_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test updating service config without a connected account should fail."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
cloud_provider = "aws"
@@ -352,12 +344,11 @@ def test_update_service_config_without_account(
def test_update_service_config_invalid_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating config for a non-existent service should fail."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
@@ -396,12 +387,11 @@ def test_update_service_config_invalid_service(
def test_update_service_config_disable_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test disabling a service by updating config with enabled=false."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"

View File

@@ -4,16 +4,16 @@ from typing import Callable, List
import requests
from wiremock.resources.mappings import Mapping
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD, add_license
from fixtures.types import Operation, SigNoz, TestContainerDocker
from fixtures.auth import add_license
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.types import SigNoz, TestContainerDocker
def test_create_and_delete_dashboard_without_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/dashboards"),
@@ -38,7 +38,6 @@ def test_create_and_delete_dashboard_without_license(
def test_apply_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[TestContainerDocker, List[Mapping]], None],
get_token: Callable[[str, str], str],
) -> None:
@@ -50,10 +49,9 @@ def test_apply_license(
def test_create_and_delete_dashboard_with_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/dashboards"),

View File

@@ -6,13 +6,13 @@ import requests
from sqlalchemy import sql
from wiremock.resources.mappings import Mapping
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD, add_license
from fixtures.types import Operation, SigNoz, TestContainerDocker
from fixtures.auth import add_license
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.types import SigNoz, TestContainerDocker
def test_apply_license(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[TestContainerDocker, List[Mapping]], None],
get_token: Callable[[str, str], str],
) -> None:
@@ -24,10 +24,9 @@ def test_apply_license(
def test_create_and_get_public_dashboard(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/dashboards"),
@@ -72,10 +71,9 @@ def test_create_and_get_public_dashboard(
def test_public_dashboard_widget_query_range(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
dashboard_req = {
"title": "Test Widget Query Range Dashboard",
@@ -196,13 +194,12 @@ def test_public_dashboard_widget_query_range(
def test_anonymous_role_has_public_dashboard_permission(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
"""
Verify that the signoz-anonymous role has the public-dashboard/* permission.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get the roles to find the org_id
response = requests.get(

View File

@@ -11,12 +11,11 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_create_logs_pipeline_success(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -29,7 +28,7 @@ def test_create_logs_pipeline_success(
3. Verify the response contains version information
4. Verify the pipeline is returned in the response
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
pipeline_payload = {
"pipelines": [
@@ -105,7 +104,6 @@ def test_create_logs_pipeline_success(
def test_list_logs_pipelines_success(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -117,7 +115,7 @@ def test_list_logs_pipelines_success(
2. List all pipelines and verify the created pipeline is present
3. Verify the response structure
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a pipeline first
create_payload = {
@@ -194,7 +192,6 @@ def test_list_logs_pipelines_success(
def test_list_logs_pipelines_by_version(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -207,7 +204,7 @@ def test_list_logs_pipelines_by_version(
3. List pipelines by version 1 and verify it returns the original
4. List pipelines by version 2 and verify it returns the updated version
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create version 1
v1_payload = {
@@ -356,7 +353,6 @@ def test_list_logs_pipelines_by_version(
def test_preview_logs_pipelines_success(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -368,7 +364,7 @@ def test_preview_logs_pipelines_success(
2. Verify the preview processes logs correctly
3. Verify the response contains processed logs
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
preview_payload = {
"pipelines": [
@@ -448,7 +444,6 @@ def test_preview_logs_pipelines_success(
def test_create_multiple_pipelines_success(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -460,7 +455,7 @@ def test_create_multiple_pipelines_success(
2. Verify all pipelines are created
3. Verify pipelines are ordered correctly
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
multi_pipeline_payload = {
"pipelines": [
@@ -562,7 +557,6 @@ def test_create_multiple_pipelines_success(
def test_delete_all_pipelines_success(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
@@ -575,7 +569,7 @@ def test_delete_all_pipelines_success(
3. Verify pipelines are deleted
4. Verify new version is created
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a pipeline first
create_payload = {

View File

@@ -5,100 +5,102 @@ import requests
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.auth import USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD, USER_EDITOR_NAME
logger = setup_logger(__name__)
def test_register_with_invalid_input(signoz: types.SigNoz) -> None:
"""
Test the register endpoint with invalid input.
1. Invalid Password
2. Invalid Email
"""
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/register"),
json={
"name": "admin",
"orgId": "",
"orgName": "integration.test",
"email": "admin@integration.test",
"password": "password", # invalid password
},
timeout=2,
)
# def test_register_with_invalid_input(signoz: types.SigNoz) -> None:
# """
# Test the register endpoint with invalid input.
# 1. Invalid Password
# 2. Invalid Email
# """
# response = requests.post(
# signoz.self.host_configs["8080"].get("/api/v1/register"),
# json={
# "name": "admin",
# "orgId": "",
# "orgName": "integration.test",
# "email": "admin@integration.test",
# "password": "password", # invalid password
# },
# timeout=2,
# )
assert response.status_code == HTTPStatus.BAD_REQUEST
# assert response.status_code == HTTPStatus.BAD_REQUEST
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/register"),
json={
"name": "admin",
"orgId": "",
"orgName": "integration.test",
"email": "admin", # invalid email
"password": "password123Z$",
},
timeout=2,
)
# response = requests.post(
# signoz.self.host_configs["8080"].get("/api/v1/register"),
# json={
# "name": "admin",
# "orgId": "",
# "orgName": "integration.test",
# "email": "admin", # invalid email
# "password": "password123Z$",
# },
# timeout=2,
# )
assert response.status_code == HTTPStatus.BAD_REQUEST
# assert response.status_code == HTTPStatus.BAD_REQUEST
def test_register(signoz: types.SigNoz, get_token: Callable[[str, str], str]) -> None:
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/version"), timeout=2
)
# def test_register(signoz: types.SigNoz, get_token: Callable[[str, str], str]) -> None:
# response = requests.get(
# signoz.self.host_configs["8080"].get("/api/v1/version"), timeout=2
# )
assert response.status_code == HTTPStatus.OK
assert response.json()["setupCompleted"] is False
# assert response.status_code == HTTPStatus.OK
# assert response.json()["setupCompleted"] is False
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/register"),
json={
"name": "admin",
"orgId": "",
"orgName": "integration.test",
"email": "admin@integration.test",
"password": "password123Z$",
},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
# response = requests.post(
# signoz.self.host_configs["8080"].get("/api/v1/register"),
# json={
# "name": "admin",
# "orgId": "",
# "orgName": "integration.test",
# "email": "admin@integration.test",
# "password": "password123Z$",
# },
# timeout=2,
# )
# assert response.status_code == HTTPStatus.OK
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/version"), timeout=2
)
# response = requests.get(
# signoz.self.host_configs["8080"].get("/api/v1/version"), timeout=2
# )
assert response.status_code == HTTPStatus.OK
assert response.json()["setupCompleted"] is True
# assert response.status_code == HTTPStatus.OK
# assert response.json()["setupCompleted"] is True
admin_token = get_token("admin@integration.test", "password123Z$")
# admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
# response = requests.get(
# signoz.self.host_configs["8080"].get("/api/v1/user"),
# timeout=2,
# headers={"Authorization": f"Bearer {admin_token}"},
# )
assert response.status_code == HTTPStatus.OK
# assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "admin@integration.test"),
None,
)
# user_response = response.json()["data"]
# found_user = next(
# (user for user in user_response if user["email"] == "admin@integration.test"),
# None,
# )
assert found_user is not None
assert found_user["role"] == "ADMIN"
# assert found_user is not None
# assert found_user["role"] == "ADMIN"
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user["id"]}"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
# response = requests.get(
# signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user['id']}"),
# timeout=2,
# headers={"Authorization": f"Bearer {admin_token}"},
# )
assert response.status_code == HTTPStatus.OK
assert response.json()["data"]["role"] == "ADMIN"
# assert response.status_code == HTTPStatus.OK
# assert response.json()["data"]["role"] == "ADMIN"
def test_invite_and_register(
@@ -107,10 +109,10 @@ def test_invite_and_register(
# Generate an invite token for the editor user
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "editor@integration.test", "role": "EDITOR", "name": "editor"},
json={"email": USER_EDITOR_EMAIL, "role": "EDITOR", "name": USER_EDITOR_NAME},
timeout=2,
headers={
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
},
)
@@ -120,7 +122,7 @@ def test_invite_and_register(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
timeout=2,
headers={
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
},
)
@@ -129,7 +131,7 @@ def test_invite_and_register(
(
invite
for invite in invite_response
if invite["email"] == "editor@integration.test"
if invite["email"] == USER_EDITOR_EMAIL
),
None,
)
@@ -138,8 +140,8 @@ def test_invite_and_register(
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite/accept"),
json={
"password": "password123Z$",
"displayName": "editor",
"password": USER_EDITOR_PASSWORD,
"displayName": USER_EDITOR_NAME,
"token": f"{found_invite['token']}",
},
timeout=2,
@@ -159,7 +161,7 @@ def test_invite_and_register(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={
"Authorization": f"Bearer {get_token("editor@integration.test", "password123Z$")}"
"Authorization": f"Bearer {get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)}"
},
)
@@ -170,7 +172,7 @@ def test_invite_and_register(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
},
)
@@ -178,20 +180,20 @@ def test_invite_and_register(
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "editor@integration.test"),
(user for user in user_response if user["email"] == USER_EDITOR_EMAIL),
None,
)
assert found_user is not None
assert found_user["role"] == "EDITOR"
assert found_user["displayName"] == "editor"
assert found_user["email"] == "editor@integration.test"
assert found_user["displayName"] == USER_EDITOR_NAME
assert found_user["email"] == USER_EDITOR_EMAIL
def test_revoke_invite_and_register(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Generate an invite token for the viewer user
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
@@ -206,7 +208,7 @@ def test_revoke_invite_and_register(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
timeout=2,
headers={
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
},
)
@@ -234,7 +236,7 @@ def test_revoke_invite_and_register(
json={
"password": "password123Z$",
"displayName": "viewer",
"token": f"{found_invite["token"]}",
"token": f"{found_invite['token']}",
},
timeout=2,
)
@@ -245,7 +247,7 @@ def test_revoke_invite_and_register(
def test_self_access(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
@@ -257,7 +259,7 @@ def test_self_access(
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "editor@integration.test"),
(user for user in user_response if user["email"] == USER_EDITOR_EMAIL),
None,
)

View File

@@ -11,6 +11,7 @@ from wiremock.client import (
)
from fixtures import types
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_apply_license(
@@ -56,7 +57,7 @@ def test_apply_license(
],
)
access_token = get_token("admin@integration.test", "password123Z$")
access_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
url=signoz.self.host_configs["8080"].get("/api/v3/licenses"),
@@ -119,7 +120,7 @@ def test_refresh_license(
],
)
access_token = get_token("admin@integration.test", "password123Z$")
access_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.put(
url=signoz.self.host_configs["8080"].get("/api/v3/licenses"),
@@ -176,7 +177,7 @@ def test_license_checkout(
],
)
access_token = get_token("admin@integration.test", "password123Z$")
access_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
url=signoz.self.host_configs["8080"].get("/api/v1/checkout"),
@@ -227,7 +228,7 @@ def test_license_portal(
],
)
access_token = get_token("admin@integration.test", "password123Z$")
access_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
url=signoz.self.host_configs["8080"].get("/api/v1/portal"),

View File

@@ -4,10 +4,11 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/pats"),
@@ -28,7 +29,7 @@ def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
headers={"SIGNOZ-API-KEY": f"{pat_response['data']['token']}"},
)
assert response.status_code == HTTPStatus.OK
@@ -38,14 +39,14 @@ def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
(
user
for user in user_response["data"]
if user["email"] == "admin@integration.test"
if user["email"] == ROOT_USER_EMAIL
),
None,
)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/pats"),
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
headers={"SIGNOZ-API-KEY": f"{pat_response['data']['token']}"},
timeout=2,
)

View File

@@ -6,6 +6,7 @@ from sqlalchemy import sql
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
logger = setup_logger(__name__)
@@ -13,7 +14,7 @@ logger = setup_logger(__name__)
def test_change_password(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create another admin user
response = requests.post(
@@ -130,7 +131,7 @@ def test_change_password(
def test_reset_password(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get the user id for admin+password@integration.test
response = requests.get(
@@ -188,7 +189,7 @@ def test_reset_password(
def test_reset_password_with_no_password(
signoz: types.SigNoz, get_token: Callable[[str, str], str]
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get the user id for admin+password@integration.test
response = requests.get(
@@ -253,7 +254,7 @@ def test_forgot_password_returns_204_for_nonexistent_email(
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/sessions/context"),
params={
"email": "admin@integration.test",
"email": ROOT_USER_EMAIL,
"ref": f"{signoz.self.host_configs['8080'].base()}",
},
timeout=5,
@@ -286,7 +287,7 @@ def test_forgot_password_creates_reset_token(
3. Use the token to reset password
4. Verify user can login with new password
"""
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a user specifically for testing forgot password
response = requests.post(
@@ -418,7 +419,7 @@ def test_reset_password_with_expired_token(
"""
Test that resetting password with an expired token fails.
"""
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get user ID for the forgot@integration.test user
response = requests.get(

View File

@@ -4,6 +4,7 @@ from typing import Callable, Tuple
import requests
from fixtures import types
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
def test_change_role(
@@ -11,7 +12,7 @@ def test_change_role(
get_token: Callable[[str, str], str],
get_tokens: Callable[[str, str], Tuple[str, str]],
):
admin_token = get_token("admin@integration.test", "password123Z$")
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Create a new user as VIEWER
response = requests.post(

View File

@@ -4,7 +4,7 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -12,10 +12,9 @@ logger = setup_logger(__name__)
def test_get_user_preference(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/preferences"),
@@ -29,10 +28,9 @@ def test_get_user_preference(
def test_get_set_user_preference_by_name(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# preference does not exist
response = requests.get(

View File

@@ -4,7 +4,7 @@ from typing import Callable
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@@ -12,10 +12,9 @@ logger = setup_logger(__name__)
def test_get_org_preference(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
@@ -29,10 +28,9 @@ def test_get_org_preference(
def test_get_set_org_preference_by_name(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# preference does not exist
response = requests.get(

View File

@@ -6,7 +6,7 @@ import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logs import Logs
from fixtures.querier import (
assert_minutely_bucket_values,
@@ -19,7 +19,6 @@ from src.querier.util import assert_identical_query_response
def test_logs_list(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -84,7 +83,7 @@ def test_logs_list(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Query Logs for the last 10 seconds and check if the logs are returned in the correct order
response = requests.post(
@@ -401,7 +400,6 @@ def test_logs_list(
def test_logs_list_with_corrupt_data(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -470,7 +468,7 @@ def test_logs_list_with_corrupt_data(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Query Logs for the last 10 seconds and check if the logs are returned in the correct order
response = requests.post(
@@ -583,7 +581,6 @@ def test_logs_list_with_corrupt_data(
)
def test_logs_list_with_order_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
order_by_context: str,
@@ -613,7 +610,7 @@ def test_logs_list_with_order_by(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = {
"type": "builder_query",
@@ -685,7 +682,6 @@ def test_logs_list_with_order_by(
def test_logs_time_series_count(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -788,7 +784,7 @@ def test_logs_time_series_count(
)
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# count() of all logs for the last 5 minutes
response = requests.post(
@@ -1226,7 +1222,6 @@ def test_logs_time_series_count(
def test_datatype_collision(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -1332,7 +1327,7 @@ def test_datatype_collision(
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# count() of all logs for the where severity_number > '7'
response = requests.post(
@@ -1661,7 +1656,6 @@ def test_datatype_collision(
def test_logs_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -1687,7 +1681,7 @@ def test_logs_fill_gaps(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1747,7 +1741,6 @@ def test_logs_fill_gaps(
def test_logs_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -1773,7 +1766,7 @@ def test_logs_fill_gaps_with_group_by(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1848,7 +1841,6 @@ def test_logs_fill_gaps_with_group_by(
def test_logs_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -1874,7 +1866,7 @@ def test_logs_fill_gaps_formula(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1955,7 +1947,6 @@ def test_logs_fill_gaps_formula(
def test_logs_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -1981,7 +1972,7 @@ def test_logs_fill_gaps_formula_with_group_by(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -2083,7 +2074,6 @@ def test_logs_fill_gaps_formula_with_group_by(
def test_logs_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -2102,7 +2092,7 @@ def test_logs_fill_zero(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -2159,7 +2149,6 @@ def test_logs_fill_zero(
def test_logs_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -2185,7 +2174,7 @@ def test_logs_fill_zero_with_group_by(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -2259,7 +2248,6 @@ def test_logs_fill_zero_with_group_by(
def test_logs_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -2285,7 +2273,7 @@ def test_logs_fill_zero_formula(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -2367,7 +2355,6 @@ def test_logs_fill_zero_formula(
def test_logs_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -2393,7 +2380,7 @@ def test_logs_fill_zero_formula_with_group_by(
]
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)

View File

@@ -6,13 +6,12 @@ from typing import Callable, List
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logs import Logs
def test_logs_json_body_simple_searches(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -86,7 +85,7 @@ def test_logs_json_body_simple_searches(
]
)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Test 1: Search by body.message = "User logged in successfully"
response = requests.post(
@@ -301,7 +300,6 @@ def test_logs_json_body_simple_searches(
def test_logs_json_body_nested_keys(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -394,7 +392,7 @@ def test_logs_json_body_nested_keys(
]
)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Test 1: Search by body.user.name = "john_doe"
response = requests.post(
@@ -571,7 +569,6 @@ def test_logs_json_body_nested_keys(
def test_logs_json_body_array_membership(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -648,7 +645,7 @@ def test_logs_json_body_array_membership(
]
)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Test 1: Search by has(body.tags[*], "production")
response = requests.post(
@@ -779,7 +776,6 @@ def test_logs_json_body_array_membership(
def test_logs_json_body_listing(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
@@ -877,7 +873,7 @@ def test_logs_json_body_listing(
]
)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Test 1: List all logs with JSON bodies
response = requests.post(

View File

@@ -5,7 +5,7 @@ from typing import Callable, Dict, List
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
assert_minutely_bucket_values,
@@ -16,7 +16,6 @@ from fixtures.querier import (
def test_metrics_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -46,7 +45,7 @@ def test_metrics_fill_gaps(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -110,7 +109,6 @@ def test_metrics_fill_gaps(
def test_metrics_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -139,7 +137,7 @@ def test_metrics_fill_gaps_with_group_by(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -219,7 +217,6 @@ def test_metrics_fill_gaps_with_group_by(
def test_metrics_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -249,7 +246,7 @@ def test_metrics_fill_gaps_formula(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -342,7 +339,6 @@ def test_metrics_fill_gaps_formula(
def test_metrics_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -385,7 +381,7 @@ def test_metrics_fill_gaps_formula_with_group_by(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -498,7 +494,6 @@ def test_metrics_fill_gaps_formula_with_group_by(
def test_metrics_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -520,7 +515,7 @@ def test_metrics_fill_zero(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -583,7 +578,6 @@ def test_metrics_fill_zero(
def test_metrics_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -611,7 +605,7 @@ def test_metrics_fill_zero_with_group_by(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -691,7 +685,6 @@ def test_metrics_fill_zero_with_group_by(
def test_metrics_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -720,7 +713,7 @@ def test_metrics_fill_zero_formula(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -813,7 +806,6 @@ def test_metrics_fill_zero_formula(
def test_metrics_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -856,7 +848,7 @@ def test_metrics_fill_zero_formula_with_group_by(
]
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)

View File

@@ -6,7 +6,7 @@ import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.querier import (
assert_minutely_bucket_values,
find_named_result,
@@ -23,7 +23,6 @@ from src.querier.util import (
def test_traces_list(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -150,7 +149,7 @@ def test_traces_list(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Query all traces for the past 5 minutes
response = requests.post(
@@ -662,7 +661,6 @@ def test_traces_list(
)
def test_traces_list_with_corrupt_data(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
payload: Dict[str, Any],
@@ -680,7 +678,7 @@ def test_traces_list_with_corrupt_data(
# 4 Traces with corrupt metadata inserted
# traces[i] occured before traces[j] where i < j
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_query_request(
signoz,
@@ -766,7 +764,6 @@ def test_traces_list_with_corrupt_data(
)
def test_traces_aggergate_order_by_count(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
order_by: Dict[str, str],
@@ -893,7 +890,7 @@ def test_traces_aggergate_order_by_count(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = {
"type": "builder_query",
@@ -937,7 +934,6 @@ def test_traces_aggergate_order_by_count(
def test_traces_aggregate_with_mixed_field_selectors(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1061,7 +1057,7 @@ def test_traces_aggregate_with_mixed_field_selectors(
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = {
"type": "builder_query",
@@ -1114,7 +1110,6 @@ def test_traces_aggregate_with_mixed_field_selectors(
def test_traces_fill_gaps(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1141,7 +1136,7 @@ def test_traces_fill_gaps(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1198,7 +1193,6 @@ def test_traces_fill_gaps(
def test_traces_fill_gaps_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1237,7 +1231,7 @@ def test_traces_fill_gaps_with_group_by(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1311,7 +1305,6 @@ def test_traces_fill_gaps_with_group_by(
def test_traces_fill_gaps_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1350,7 +1343,7 @@ def test_traces_fill_gaps_formula(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1431,7 +1424,6 @@ def test_traces_fill_gaps_formula(
def test_traces_fill_gaps_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1470,7 +1462,7 @@ def test_traces_fill_gaps_formula_with_group_by(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1572,7 +1564,6 @@ def test_traces_fill_gaps_formula_with_group_by(
def test_traces_fill_zero(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1598,7 +1589,7 @@ def test_traces_fill_zero(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1655,7 +1646,6 @@ def test_traces_fill_zero(
def test_traces_fill_zero_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1694,7 +1684,7 @@ def test_traces_fill_zero_with_group_by(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1769,7 +1759,6 @@ def test_traces_fill_zero_with_group_by(
def test_traces_fill_zero_formula(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1808,7 +1797,7 @@ def test_traces_fill_zero_formula(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
@@ -1889,7 +1878,6 @@ def test_traces_fill_zero_formula(
def test_traces_fill_zero_formula_with_group_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
@@ -1928,7 +1916,7 @@ def test_traces_fill_zero_formula_with_group_by(
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)

View File

@@ -8,7 +8,7 @@ from http import HTTPStatus
from typing import Any, Callable, List
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
@@ -23,7 +23,6 @@ CUMULATIVE_COUNTERS_FILE = os.path.join(TESTDATA_DIR, "cumulative_counters_1h.js
def test_rate_with_steady_values_and_reset(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -39,7 +38,7 @@ def test_rate_with_steady_values_and_reset(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,
@@ -73,7 +72,6 @@ def test_rate_with_steady_values_and_reset(
def test_rate_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
@@ -89,7 +87,7 @@ def test_rate_group_by_endpoint(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,

View File

@@ -5,7 +5,7 @@ from typing import Callable, Dict, List, Optional, Tuple
import requests
from fixtures import querier, types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logs import Logs
from fixtures.metrics import Metrics
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
@@ -210,14 +210,13 @@ def build_metrics_query(
def test_logs_scalar_group_by_single_agg_no_order(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -238,14 +237,13 @@ def test_logs_scalar_group_by_single_agg_no_order(
def test_logs_scalar_group_by_single_agg_order_by_agg_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -266,14 +264,13 @@ def test_logs_scalar_group_by_single_agg_order_by_agg_asc(
def test_logs_scalar_group_by_single_agg_order_by_agg_desc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -294,14 +291,13 @@ def test_logs_scalar_group_by_single_agg_order_by_agg_desc(
def test_logs_scalar_group_by_single_agg_order_by_grouping_key_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -326,14 +322,13 @@ def test_logs_scalar_group_by_single_agg_order_by_grouping_key_asc(
def test_logs_scalar_group_by_single_agg_order_by_grouping_key_desc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -358,14 +353,13 @@ def test_logs_scalar_group_by_single_agg_order_by_grouping_key_desc(
def test_logs_scalar_group_by_multiple_aggs_order_by_first_agg_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -390,14 +384,13 @@ def test_logs_scalar_group_by_multiple_aggs_order_by_first_agg_asc(
def test_logs_scalar_group_by_multiple_aggs_order_by_second_agg_desc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -423,14 +416,13 @@ def test_logs_scalar_group_by_multiple_aggs_order_by_second_agg_desc(
def test_logs_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -455,14 +447,13 @@ def test_logs_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
def test_logs_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -487,14 +478,13 @@ def test_logs_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
def test_logs_scalar_group_by_order_by_grouping_key_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(generate_logs_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -519,14 +509,13 @@ def test_logs_scalar_group_by_order_by_grouping_key_asc_limit_2(
def test_traces_scalar_group_by_single_agg_no_order(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -547,14 +536,13 @@ def test_traces_scalar_group_by_single_agg_no_order(
def test_traces_scalar_group_by_single_agg_order_by_agg_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -575,14 +563,13 @@ def test_traces_scalar_group_by_single_agg_order_by_agg_asc(
def test_traces_scalar_group_by_single_agg_order_by_agg_desc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -603,14 +590,13 @@ def test_traces_scalar_group_by_single_agg_order_by_agg_desc(
def test_traces_scalar_group_by_single_agg_order_by_grouping_key_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -635,14 +621,13 @@ def test_traces_scalar_group_by_single_agg_order_by_grouping_key_asc(
def test_traces_scalar_group_by_single_agg_order_by_grouping_key_desc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -667,14 +652,13 @@ def test_traces_scalar_group_by_single_agg_order_by_grouping_key_desc(
def test_traces_scalar_group_by_multiple_aggs_order_by_first_agg_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -699,14 +683,13 @@ def test_traces_scalar_group_by_multiple_aggs_order_by_first_agg_asc(
def test_traces_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -731,14 +714,13 @@ def test_traces_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
def test_traces_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -763,14 +745,13 @@ def test_traces_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
def test_traces_scalar_group_by_order_by_grouping_key_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(generate_traces_with_counts(now, log_or_trace_service_counts))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -795,14 +776,13 @@ def test_traces_scalar_group_by_order_by_grouping_key_asc_limit_2(
def test_metrics_scalar_group_by_single_agg_no_order(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -828,14 +808,13 @@ def test_metrics_scalar_group_by_single_agg_no_order(
def test_metrics_scalar_group_by_single_agg_order_by_agg_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -866,14 +845,13 @@ def test_metrics_scalar_group_by_single_agg_order_by_agg_asc(
def test_metrics_scalar_group_by_single_agg_order_by_grouping_key_asc(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -904,14 +882,13 @@ def test_metrics_scalar_group_by_single_agg_order_by_grouping_key_asc(
def test_metrics_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -938,14 +915,13 @@ def test_metrics_scalar_group_by_single_agg_order_by_agg_asc_limit_2(
def test_metrics_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,
@@ -972,14 +948,13 @@ def test_metrics_scalar_group_by_single_agg_order_by_agg_desc_limit_3(
def test_metrics_scalar_group_by_order_by_grouping_key_asc_limit_2(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_metrics(generate_metrics_with_values(now, metric_values_for_test))
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = make_scalar_query_request(
signoz,
token,

View File

@@ -10,7 +10,7 @@ from typing import Callable, List
import pytest
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
@@ -38,7 +38,6 @@ MULTI_TEMPORALITY_FILE_24h = get_testdata_file_path(
)
def test_with_steady_values_and_reset(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
time_aggregation: str,
@@ -58,7 +57,7 @@ def test_with_steady_values_and_reset(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,
@@ -99,7 +98,6 @@ def test_with_steady_values_and_reset(
)
def test_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
time_aggregation: str,
@@ -122,7 +120,7 @@ def test_group_by_endpoint(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,
@@ -282,7 +280,6 @@ def test_group_by_endpoint(
)
def test_for_service_with_switch(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
time_aggregation: str,
@@ -302,7 +299,7 @@ def test_for_service_with_switch(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,
@@ -339,7 +336,6 @@ def test_for_service_with_switch(
)
def test_for_week_long_time_range(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
time_aggregation: str,
@@ -357,7 +353,7 @@ def test_for_week_long_time_range(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,
@@ -383,7 +379,6 @@ def test_for_week_long_time_range(
)
def test_for_month_long_time_range(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
time_aggregation: str,
@@ -401,7 +396,7 @@ def test_for_month_long_time_range(
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
query = build_builder_query(
"A",
metric_name,

View File

@@ -5,18 +5,17 @@ import pytest
import requests
from sqlalchemy import sql
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.types import Operation, SigNoz
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.types import SigNoz
ANONYMOUS_USER_ID = "00000000-0000-0000-0000-000000000000"
def test_managed_roles_create_on_register(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# get the list of all roles.
response = requests.get(
@@ -45,10 +44,9 @@ def test_managed_roles_create_on_register(
def test_root_user_signoz_admin_assignment(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# Get the user from the /user/me endpoint and extract the id
user_response = requests.get(
@@ -106,10 +104,9 @@ def test_root_user_signoz_admin_assignment(
def test_anonymous_user_signoz_anonymous_assignment(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),

View File

@@ -5,22 +5,17 @@ import pytest
import requests
from sqlalchemy import sql
from fixtures.auth import (
USER_ADMIN_EMAIL,
USER_ADMIN_PASSWORD,
USER_EDITOR_EMAIL,
USER_EDITOR_PASSWORD,
)
from fixtures.types import Operation, SigNoz
from fixtures.auth import USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.types import SigNoz
def test_user_invite_accept_role_grant(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
# invite a user as editor
invite_payload = {
@@ -100,7 +95,6 @@ def test_user_invite_accept_role_grant(
def test_user_update_role_grant(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
# Get the editor user's id
@@ -114,7 +108,7 @@ def test_user_update_role_grant(
editor_id = user_me_response.json()["data"]["id"]
# Get the role id for viewer
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
roles_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
@@ -177,7 +171,6 @@ def test_user_update_role_grant(
def test_user_delete_role_revoke(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
# login with editor to get the user_id and check if user exists
@@ -191,7 +184,7 @@ def test_user_delete_role_revoke(
editor_id = user_me_response.json()["data"]["id"]
# delete the editor user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_token = get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)
delete_response = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{editor_id}"),
headers={"Authorization": f"Bearer {admin_token}"},

View File

@@ -14,7 +14,7 @@ import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.signoz import ROOT_USER_EMAIL, ROOT_USER_PASSWORD
from fixtures.logger import setup_logger
from fixtures.logs import Logs
@@ -98,14 +98,6 @@ def verify_table_retention_expression(
)
@pytest.fixture(name="ttl_test_suite_setup", scope="package", autouse=True)
def ttl_test_suite_setup(create_user_admin): # pylint: disable=unused-argument
# This fixture creates a admin user for the entire ttl test suite
# The create_user_admin fixture is executed just by being a dependency
print("Setting up ttl test suite")
yield
def test_set_ttl_traces_success(
signoz: types.SigNoz,
get_token: Callable[[str, str], str],
@@ -121,7 +113,7 @@ def test_set_ttl_traces_success(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -171,7 +163,7 @@ def test_set_ttl_traces_with_cold_storage(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -233,7 +225,7 @@ def test_set_ttl_metrics_success(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -285,7 +277,7 @@ def test_set_ttl_metrics_with_cold_storage(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -350,7 +342,7 @@ def test_set_ttl_invalid_type(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -382,7 +374,7 @@ def test_set_custom_retention_ttl_basic(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -444,7 +436,7 @@ def test_set_custom_retention_ttl_basic_with_cold_storage(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -512,7 +504,7 @@ def test_set_custom_retention_ttl_basic_fallback(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -576,7 +568,7 @@ def test_set_custom_retention_ttl_basic_101_times(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -628,7 +620,7 @@ def test_set_custom_retention_ttl_with_conditions(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -739,7 +731,7 @@ def test_set_custom_retention_ttl_with_invalid_cold_storage(
insert_logs(logs)
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -783,7 +775,7 @@ def test_set_custom_retention_ttl_duplicate_conditions(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -820,7 +812,7 @@ def test_set_custom_retention_ttl_invalid_condition(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -859,7 +851,7 @@ def test_get_custom_retention_ttl(
insert_logs(logs)
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
set_response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
@@ -874,7 +866,7 @@ def test_get_custom_retention_ttl(
# Now get the TTL configuration
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
get_response = requests.get(
@@ -912,7 +904,7 @@ def test_set_ttl_logs_success(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(
@@ -954,7 +946,7 @@ def test_get_ttl_traces_success(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
set_response = requests.post(
@@ -1024,7 +1016,7 @@ def test_large_ttl_conditions_list(
}
headers = {
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
"Authorization": f"Bearer {get_token(ROOT_USER_EMAIL, ROOT_USER_PASSWORD)}"
}
response = requests.post(