Compare commits

...

5 Commits

Author SHA1 Message Date
Karan Balani
bc7253d7fe fix: fmtlint for tests 2026-03-25 17:58:10 +05:30
Karan Balani
38b2d1c462 chore: add update my user api test 2026-03-25 17:44:33 +05:30
Karan Balani
1c2d2bfc49 fix: fmtlint and remove me test 2026-03-25 17:41:12 +05:30
Karan Balani
a8386888fb fix: integration tests 2026-03-25 17:28:43 +05:30
Karan Balani
1d79fb2f4f test: integration tests now use v2 user apis 2026-03-25 16:39:56 +05:30
12 changed files with 640 additions and 309 deletions

View File

@@ -654,19 +654,6 @@ def get_oidc_domain(signoz: types.SigNoz, admin_token: str) -> dict:
)
def get_user_by_email(signoz: types.SigNoz, admin_token: str, email: str) -> dict:
"""Helper to get a user by email."""
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
return next(
(user for user in response.json()["data"] if user["email"] == email),
None,
)
def perform_oidc_login(
signoz: types.SigNoz, # pylint: disable=unused-argument
idp: types.TestContainerIDP,

View File

@@ -3,6 +3,9 @@ import os
from typing import Any
import isodate
import requests
from fixtures import types
# parses the given timestamp string from ISO format to datetime.datetime
@@ -31,3 +34,104 @@ def parse_duration(duration: Any) -> datetime.timedelta:
def get_testdata_file_path(file: str) -> str:
testdata_dir = os.path.join(os.path.dirname(__file__), "..", "testdata")
return os.path.join(testdata_dir, file)
def get_user_by_email(signoz: types.SigNoz, admin_token: str, email: str) -> dict:
"""Helper to get a user by email."""
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers=headers,
)
return next(
(user for user in response.json()["data"] if user["email"] == email),
None,
)
def get_user_role_names(signoz: types.SigNoz, admin_token: str, user_id: str) -> list:
"""Helper to get the user roles by user ID"""
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
timeout=2,
headers=headers,
)
roles = response.json()["data"]
if not roles:
return []
return [role["name"] for role in roles]
def get_user_roles(signoz: types.SigNoz, admin_token: str, user_id: str) -> list:
"""Helper to get the user roles (full objects) by user ID"""
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
timeout=2,
headers=headers,
)
return response.json()["data"] or []
def add_user_role(
signoz: types.SigNoz, admin_token: str, user_id: str, role_name: str
) -> None:
"""Helper to add a role to a user via POST /api/v2/users/{id}/roles"""
response = requests.post(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
json={"name": role_name},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert (
response.status_code == 200
), f"failed to add role {role_name}: {response.text}"
def remove_user_role_by_name(
signoz: types.SigNoz, admin_token: str, user_id: str, role_name: str
) -> None:
"""Helper to remove a role from a user by role name"""
roles = get_user_roles(signoz, admin_token, user_id)
role_id = next((r["id"] for r in roles if r["name"] == role_name), None)
assert role_id is not None, f"role {role_name} not found for user {user_id}"
response = requests.delete(
signoz.self.host_configs["8080"].get(
f"/api/v2/users/{user_id}/roles/{role_id}"
),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert (
response.status_code == 204
), f"failed to remove role {role_name}: {response.text}"
def set_user_roles(
signoz: types.SigNoz, admin_token: str, user_id: str, desired_role_names: list
) -> None:
"""Helper to set exact roles for a user using POST/DELETE endpoints"""
current_roles = get_user_roles(signoz, admin_token, user_id)
current_names = {r["name"] for r in current_roles}
desired_names = set(desired_role_names)
# Remove roles not in desired set
for role in current_roles:
if role["name"] not in desired_names:
response = requests.delete(
signoz.self.host_configs["8080"].get(
f"/api/v2/users/{user_id}/roles/{role['id']}"
),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == 204
# Add roles not in current set
for name in desired_names:
if name not in current_names:
add_user_role(signoz, admin_token, user_id, name)

View File

@@ -12,11 +12,8 @@ from fixtures.auth import (
USER_ADMIN_PASSWORD,
add_license,
)
from fixtures.idputils import (
get_saml_domain,
get_user_by_email,
perform_saml_login,
)
from fixtures.idputils import get_saml_domain, perform_saml_login
from fixtures.utils import get_user_by_email, get_user_role_names
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
@@ -131,26 +128,12 @@ def test_saml_authn(
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "viewer@saml.integration.test"
),
None,
)
found_user = get_user_by_email(signoz, admin_token, "viewer@saml.integration.test")
assert found_user is not None
assert found_user["role"] == "VIEWER"
# Confirm role
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_idp_initiated_saml_authn(
@@ -182,26 +165,14 @@ def test_idp_initiated_saml_authn(
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
found_user = get_user_by_email(
signoz, admin_token, "viewer.idp.initiated@saml.integration.test"
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "viewer.idp.initiated@saml.integration.test"
),
None,
)
assert found_user is not None
assert found_user["role"] == "VIEWER"
# Confirm role
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_update_domain_with_group_mappings(
@@ -271,7 +242,8 @@ def test_saml_role_mapping_single_group_admin(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_saml_role_mapping_single_group_editor(
@@ -297,7 +269,8 @@ def test_saml_role_mapping_single_group_editor(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_saml_role_mapping_multiple_groups_highest_wins(
@@ -327,7 +300,8 @@ def test_saml_role_mapping_multiple_groups_highest_wins(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_saml_role_mapping_explicit_viewer_group(
@@ -354,7 +328,8 @@ def test_saml_role_mapping_explicit_viewer_group(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_role_mapping_unmapped_group_uses_default(
@@ -380,7 +355,8 @@ def test_saml_role_mapping_unmapped_group_uses_default(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_update_domain_with_use_role_claim(
@@ -457,7 +433,8 @@ def test_saml_role_mapping_role_claim_takes_precedence(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_saml_role_mapping_invalid_role_claim_fallback(
@@ -487,7 +464,8 @@ def test_saml_role_mapping_invalid_role_claim_fallback(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_saml_role_mapping_case_insensitive(
@@ -517,7 +495,8 @@ def test_saml_role_mapping_case_insensitive(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_saml_name_mapping(
@@ -545,7 +524,8 @@ def test_saml_name_mapping(
assert (
found_user["displayName"] == "Jane"
) # We are only mapping the first name here
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_empty_name_fallback(
@@ -570,7 +550,8 @@ def test_saml_empty_name_fallback(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_sso_login_activates_pending_invite_user(
@@ -613,7 +594,8 @@ def test_saml_sso_login_activates_pending_invite_user(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["status"] == "active"
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_saml_sso_deleted_user_gets_new_user_on_login(
@@ -680,7 +662,7 @@ def test_saml_sso_deleted_user_gets_new_user_on_login(
# Verify a NEW active user was auto-provisioned via SSO
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
@@ -694,4 +676,7 @@ def test_saml_sso_deleted_user_gets_new_user_on_login(
)
assert found_user is not None
assert found_user["status"] == "active"
assert found_user["role"] == "VIEWER" # default role from SSO domain config
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert (
"signoz-viewer" in found_user_role_names
) # default role from SSO domain config

View File

@@ -11,11 +11,8 @@ from fixtures.auth import (
USER_ADMIN_PASSWORD,
add_license,
)
from fixtures.idputils import (
get_oidc_domain,
get_user_by_email,
perform_oidc_login,
)
from fixtures.idputils import get_oidc_domain, perform_oidc_login
from fixtures.utils import get_user_by_email, get_user_role_names
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
@@ -112,26 +109,12 @@ def test_oidc_authn(
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Assert that the user was created in signoz.
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "viewer@oidc.integration.test"
),
None,
)
found_user = get_user_by_email(signoz, admin_token, "viewer@oidc.integration.test")
assert found_user is not None
assert found_user["role"] == "VIEWER"
# Confirm role
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_oidc_update_domain_with_group_mappings(
@@ -208,7 +191,8 @@ def test_oidc_role_mapping_single_group_admin(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_oidc_role_mapping_single_group_editor(
@@ -234,7 +218,8 @@ def test_oidc_role_mapping_single_group_editor(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_oidc_role_mapping_multiple_groups_highest_wins(
@@ -264,7 +249,8 @@ def test_oidc_role_mapping_multiple_groups_highest_wins(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_oidc_role_mapping_explicit_viewer_group(
@@ -291,7 +277,8 @@ def test_oidc_role_mapping_explicit_viewer_group(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_oidc_role_mapping_unmapped_group_uses_default(
@@ -317,7 +304,8 @@ def test_oidc_role_mapping_unmapped_group_uses_default(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
def test_oidc_update_domain_with_use_role_claim(
@@ -397,7 +385,8 @@ def test_oidc_role_mapping_role_claim_takes_precedence(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_oidc_role_mapping_invalid_role_claim_fallback(
@@ -429,7 +418,8 @@ def test_oidc_role_mapping_invalid_role_claim_fallback(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_oidc_role_mapping_case_insensitive(
@@ -459,7 +449,8 @@ def test_oidc_role_mapping_case_insensitive(
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
def test_oidc_name_mapping(
@@ -482,20 +473,13 @@ def test_oidc_name_mapping(
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
users = response.json()["data"]
found_user = next((u for u in users if u["email"] == email), None)
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
# Keycloak concatenates firstName + lastName into "name" claim
assert found_user["displayName"] == "John Doe"
assert found_user["role"] == "VIEWER" # Default role
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names # Default role
def test_oidc_empty_name_uses_fallback(
@@ -518,19 +502,12 @@ def test_oidc_empty_name_uses_fallback(
)
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
users = response.json()["data"]
found_user = next((u for u in users if u["email"] == email), None)
found_user = get_user_by_email(signoz, admin_token, email)
# User should still be created even with empty name
assert found_user is not None
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names
# Note: displayName may be empty - this is a known limitation
@@ -570,16 +547,9 @@ def test_oidc_sso_login_activates_pending_invite_user(
signoz, idp, driver, get_session_context, idp_login, email, "password123"
)
# User should be active with ADMIN role from invite, not VIEWER from SSO
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
found_user = next(
(user for user in response.json()["data"] if user["email"] == email),
None,
)
# User should be active with VIEWER role from SSO
found_user = get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["status"] == "active"
assert found_user["role"] == "VIEWER"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-viewer" in found_user_role_names

View File

@@ -5,6 +5,7 @@ import requests
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.utils import get_user_by_email, get_user_role_names
logger = setup_logger(__name__)
@@ -74,31 +75,10 @@ def test_register(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
admin_token = get_token("admin@integration.test", "password123Z$")
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "admin@integration.test"),
None,
)
found_user = get_user_by_email(signoz, admin_token, "admin@integration.test")
assert found_user is not None
assert found_user["role"] == "ADMIN"
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user["id"]}"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["data"]["role"] == "ADMIN"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-admin" in found_user_role_names
def test_invite_and_register(
@@ -120,21 +100,11 @@ def test_invite_and_register(
assert invited_user["role"] == "EDITOR"
# Verify the user user appears in the users list but as pending_invite status
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "editor@integration.test"),
None,
)
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
assert found_user is not None
assert found_user["status"] == "pending_invite"
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
reset_token = invited_user["token"]
@@ -152,7 +122,7 @@ def test_invite_and_register(
# Verify that an admin endpoint cannot be called by the editor user
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {editor_token}"},
)
@@ -160,24 +130,12 @@ def test_invite_and_register(
assert response.status_code == HTTPStatus.FORBIDDEN
# Verify that the editor user status has been updated to ACTIVE
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "editor@integration.test"),
None,
)
admin_token = get_token("admin@integration.test", "password123Z$")
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
assert found_user is not None
assert found_user["role"] == "EDITOR"
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names
assert found_user["displayName"] == "editor"
assert found_user["email"] == "editor@integration.test"
assert found_user["status"] == "active"
@@ -221,25 +179,7 @@ def test_self_access(
) -> None:
admin_token = get_token("admin@integration.test", "password123Z$")
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "editor@integration.test"),
None,
)
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user['id']}"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["data"]["role"] == "EDITOR"
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
assert found_user is not None
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
assert "signoz-editor" in found_user_role_names

View File

@@ -26,7 +26,7 @@ def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
assert "token" in pat_response["data"]
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
)
@@ -85,7 +85,7 @@ def test_api_key_role(
assert "token" in pat_response["data"]
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
)
@@ -109,7 +109,7 @@ def test_api_key_role(
assert "token" in pat_response["data"]
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
)

View File

@@ -6,6 +6,7 @@ from sqlalchemy import sql
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.utils import get_user_by_email
logger = setup_logger(__name__)
@@ -35,23 +36,10 @@ def test_change_password(
assert response.status_code == HTTPStatus.NO_CONTENT
# Get the user id
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "admin+password@integration.test"
),
None,
found_user = get_user_by_email(
signoz, admin_token, "admin+password@integration.test"
)
assert found_user is not None
# Try logging in with the password
token = get_token("admin+password@integration.test", "password123Z$")
@@ -100,23 +88,10 @@ def test_reset_password(
admin_token = get_token("admin@integration.test", "password123Z$")
# Get the user id for admin+password@integration.test
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "admin+password@integration.test"
),
None,
found_user = get_user_by_email(
signoz, admin_token, "admin+password@integration.test"
)
assert found_user is not None
response = requests.get(
signoz.self.host_configs["8080"].get(
@@ -158,23 +133,10 @@ def test_reset_password_with_no_password(
admin_token = get_token("admin@integration.test", "password123Z$")
# Get the user id for admin+password@integration.test
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(
user
for user in user_response
if user["email"] == "admin+password@integration.test"
),
None,
found_user = get_user_by_email(
signoz, admin_token, "admin+password@integration.test"
)
assert found_user is not None
with signoz.sqlstore.conn.connect() as conn:
result = conn.execute(
@@ -305,17 +267,7 @@ def test_forgot_password_creates_reset_token(
# Verify reset password token was created by querying the database
# First, get the user ID
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "forgot@integration.test"),
None,
)
found_user = get_user_by_email(signoz, admin_token, "forgot@integration.test")
assert found_user is not None
reset_token = None
@@ -371,17 +323,7 @@ def test_reset_password_with_expired_token(
admin_token = get_token("admin@integration.test", "password123Z$")
# Get user ID for the forgot@integration.test user
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"]
found_user = next(
(user for user in user_response if user["email"] == "forgot@integration.test"),
None,
)
found_user = get_user_by_email(signoz, admin_token, "forgot@integration.test")
assert found_user is not None
# Get org ID

View File

@@ -4,6 +4,12 @@ from typing import Callable, Tuple
import requests
from fixtures import types
from fixtures.utils import (
add_user_role,
get_user_role_names,
remove_user_role_by_name,
set_user_roles,
)
def test_change_role(
@@ -40,7 +46,7 @@ def test_change_role(
)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
@@ -58,22 +64,13 @@ def test_change_role(
assert response.status_code == HTTPStatus.FORBIDDEN
# Change the new user's role - move to ADMIN
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{new_user_id}"),
json={
"displayName": "role change user",
"role": "ADMIN",
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
# Change the new user's role - add ADMIN, remove VIEWER
add_user_role(signoz, admin_token, new_user_id, "signoz-admin")
remove_user_role_by_name(signoz, admin_token, new_user_id, "signoz-viewer")
# Make some API calls again
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
@@ -106,3 +103,268 @@ def test_change_role(
)
assert response.status_code == HTTPStatus.OK
def test_remove_all_roles(
signoz: types.SigNoz,
get_token: Callable[[str, str], str],
get_tokens: Callable[[str, str], Tuple[str, str]],
):
admin_token = get_token("admin@integration.test", "password123Z$")
# Create a new user as EDITOR
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "admin+noroles@integration.test", "role": "EDITOR"},
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.CREATED
invited_user = response.json()["data"]
reset_token = invited_user["token"]
# Activate user via reset password
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
json={"password": "password123Z$", "token": reset_token},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Login and get user id
new_user_token, new_user_refresh_token = get_tokens(
"admin+noroles@integration.test", "password123Z$"
)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.OK
new_user_id = response.json()["data"]["id"]
# Validate the user has the editor role
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert role_names is not None
assert "signoz-editor" in role_names
# Remove all roles via DELETE endpoint
set_user_roles(signoz, admin_token, new_user_id, [])
# Validate the user has no roles
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert role_names is None or len(role_names) == 0
# Old token should be invalidated after role change
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.UNAUTHORIZED
# Token rotation should also fail for a user with no roles
# (the session endpoint requires roles to build an identity)
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
json={
"refreshToken": new_user_refresh_token,
},
headers={"Authorization": f"Bearer {new_user_token}"},
timeout=2,
)
assert (
response.status_code != HTTPStatus.OK
), "token rotation should fail for user with no roles"
def test_multiple_roles(
signoz: types.SigNoz,
get_token: Callable[[str, str], str],
get_tokens: Callable[[str, str], Tuple[str, str]],
):
admin_token = get_token("admin@integration.test", "password123Z$")
# Create a new user as VIEWER
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "admin+multirole@integration.test", "role": "VIEWER"},
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == HTTPStatus.CREATED
invited_user = response.json()["data"]
reset_token = invited_user["token"]
# Activate user via reset password
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
json={"password": "password123Z$", "token": reset_token},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Login and get user id
new_user_token, new_user_refresh_token = get_tokens(
"admin+multirole@integration.test", "password123Z$"
)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.OK
new_user_id = response.json()["data"]["id"]
# Validate user starts with viewer role
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert role_names is not None
assert role_names == [
"signoz-viewer"
], f"expected ['signoz-viewer'], got {role_names}"
# As viewer, admin-only APIs should be forbidden
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
# Assign multiple roles: add editor (viewer already assigned)
add_user_role(signoz, admin_token, new_user_id, "signoz-editor")
# Validate user has both roles
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert role_names is not None
assert sorted(role_names) == [
"signoz-editor",
"signoz-viewer",
], f"expected ['signoz-editor', 'signoz-viewer'], got {sorted(role_names)}"
# Rotate token to pick up new roles
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
json={"refreshToken": new_user_refresh_token},
headers={"Authorization": f"Bearer {new_user_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
rotate_response = response.json()["data"]
new_user_token = rotate_response["accessToken"]
new_user_refresh_token = rotate_response["refreshToken"]
# Verify /me includes both roles
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.OK
me_role_names = sorted(
ur["role"]["name"] for ur in response.json()["data"]["userRoles"]
)
assert me_role_names == [
"signoz-editor",
"signoz-viewer",
], f"expected ['signoz-editor', 'signoz-viewer'] in /me, got {me_role_names}"
# Editor+viewer still cannot access admin-only APIs
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
# Add admin role (editor + viewer already assigned)
add_user_role(signoz, admin_token, new_user_id, "signoz-admin")
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert sorted(role_names) == [
"signoz-admin",
"signoz-editor",
"signoz-viewer",
], f"expected all three roles, got {sorted(role_names)}"
# Rotate token to pick up admin role
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
json={"refreshToken": new_user_refresh_token},
headers={"Authorization": f"Bearer {new_user_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
rotate_response = response.json()["data"]
new_user_token = rotate_response["accessToken"]
new_user_refresh_token = rotate_response["refreshToken"]
# Now with admin role, admin-only APIs should succeed
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.OK
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.OK
# Reduce back to single viewer role
set_user_roles(signoz, admin_token, new_user_id, ["signoz-viewer"])
role_names = get_user_role_names(signoz, admin_token, new_user_id)
assert role_names == [
"signoz-viewer"
], f"expected ['signoz-viewer'] after reduction, got {role_names}"
# Rotate token to pick up reduced roles
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
json={"refreshToken": new_user_refresh_token},
headers={"Authorization": f"Bearer {new_user_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
rotate_response = response.json()["data"]
new_user_token = rotate_response["accessToken"]
# After reducing to viewer, admin-only APIs should be forbidden again
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
headers={"Authorization": f"Bearer {new_user_token}"},
)
assert response.status_code == HTTPStatus.FORBIDDEN

View File

@@ -52,7 +52,7 @@ def test_root_user_signoz_admin_assignment(
# Get the user from the /user/me endpoint and extract the id
user_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)

View File

@@ -47,7 +47,7 @@ def test_user_invite_accept_role_grant(
# Login with editor email and password
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
user_me_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=2,
)
@@ -102,7 +102,7 @@ def test_user_update_role_grant(
# Get the editor user's id
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
user_me_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=2,
)
@@ -120,15 +120,37 @@ def test_user_update_role_grant(
roles_data = roles_response.json()["data"]
org_id = roles_data[0]["orgId"]
# Update the user's role to viewer
update_payload = {"role": "VIEWER"}
update_response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{editor_id}"),
json=update_payload,
# Add the viewer role to the user
add_response = requests.post(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{editor_id}/roles"),
json={"name": "signoz-viewer"},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert update_response.status_code == HTTPStatus.OK
assert add_response.status_code == HTTPStatus.OK
# Get the editor role id so we can remove it
roles_list_response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{editor_id}/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert roles_list_response.status_code == HTTPStatus.OK
editor_role_id = next(
r["id"]
for r in roles_list_response.json()["data"]
if r["name"] == "signoz-editor"
)
# Remove the editor role
remove_response = requests.delete(
signoz.self.host_configs["8080"].get(
f"/api/v2/users/{editor_id}/roles/{editor_role_id}"
),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert remove_response.status_code == HTTPStatus.NO_CONTENT
# Check that user no longer has the editor role in the db
with signoz.sqlstore.conn.connect() as conn:
@@ -179,7 +201,7 @@ def test_user_delete_role_revoke(
# login with editor to get the user_id and check if user exists
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
user_me_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=2,
)
@@ -222,3 +244,120 @@ def test_user_delete_role_revoke(
else:
_user = f"user:organization/{org_id}/user/{editor_id}"
assert row["_user"] != _user
def test_update_my_user(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Invite a viewer user
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "admin+updateme@integration.test", "role": "VIEWER"},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.CREATED
reset_token = response.json()["data"]["token"]
# Activate user
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
json={"password": "password123Z$", "token": reset_token},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Login as viewer
viewer_token = get_token("admin+updateme@integration.test", "password123Z$")
# Update own display name via PUT /api/v2/users/me
response = requests.put(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
json={"displayName": "viewer updated name"},
headers={"Authorization": f"Bearer {viewer_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Verify the update via GET /api/v2/users/me
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {viewer_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["data"]["displayName"] == "viewer updated name"
def test_update_user_by_id(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Invite a user to update
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "admin+updatetest@integration.test", "role": "VIEWER"},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.CREATED
reset_token = response.json()["data"]["token"]
# Activate user
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
json={"password": "password123Z$", "token": reset_token},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Get user id
user_token = get_token("admin+updatetest@integration.test", "password123Z$")
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {user_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
user_id = response.json()["data"]["id"]
# Admin updates user's display name via PUT /api/v2/users/{id}
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}"),
json={"displayName": "renamed user"},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Verify via GET /api/v2/users/{id}
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["data"]["displayName"] == "renamed user"
# Self-update should be rejected
me_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
admin_id = me_response.json()["data"]["id"]
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{admin_id}"),
json={"displayName": "self update"},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.BAD_REQUEST

View File

@@ -14,7 +14,7 @@ def test_root_user_created(signoz: types.SigNoz) -> None:
The root user service reconciles asynchronously after startup.
Phase 1: Poll /api/v1/version until setupCompleted=true.
Phase 2: Poll /api/v1/user until it returns 200, confirming the root
Phase 2: Poll /api/v2/users until it returns 200, confirming the root
user actually exists and the impersonation provider works.
"""
# Phase 1: wait for setupCompleted
@@ -39,13 +39,13 @@ def test_root_user_created(signoz: types.SigNoz) -> None:
# Phase 2: wait for root user to be fully resolved
for attempt in range(15):
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
)
if response.status_code == HTTPStatus.OK:
return
logger.info(
"Attempt %s: /api/v1/user returned %s, retrying ...",
"Attempt %s: /api/v2/users returned %s, retrying ...",
attempt + 1,
response.status_code,
)

View File

@@ -4,6 +4,7 @@ import requests
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.utils import get_user_role_names
logger = setup_logger(__name__)
@@ -32,7 +33,7 @@ def test_impersonated_user_is_admin(signoz: types.SigNoz) -> None:
Listing users is an admin-only endpoint.
"""
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
signoz.self.host_configs["8080"].get("/api/v2/users"),
timeout=2,
)
@@ -46,4 +47,5 @@ def test_impersonated_user_is_admin(signoz: types.SigNoz) -> None:
None,
)
assert root_user is not None
assert root_user["role"] == "ADMIN"
root_user_role_names = get_user_role_names(signoz, None, root_user["id"])
assert "signoz-admin" in root_user_role_names