mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-31 17:40:25 +01:00
Compare commits
5 Commits
fix/array-
...
feat/v2-us
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc7253d7fe | ||
|
|
38b2d1c462 | ||
|
|
1c2d2bfc49 | ||
|
|
a8386888fb | ||
|
|
1d79fb2f4f |
@@ -654,19 +654,6 @@ def get_oidc_domain(signoz: types.SigNoz, admin_token: str) -> dict:
|
||||
)
|
||||
|
||||
|
||||
def get_user_by_email(signoz: types.SigNoz, admin_token: str, email: str) -> dict:
|
||||
"""Helper to get a user by email."""
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
return next(
|
||||
(user for user in response.json()["data"] if user["email"] == email),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def perform_oidc_login(
|
||||
signoz: types.SigNoz, # pylint: disable=unused-argument
|
||||
idp: types.TestContainerIDP,
|
||||
|
||||
@@ -3,6 +3,9 @@ import os
|
||||
from typing import Any
|
||||
|
||||
import isodate
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
|
||||
|
||||
# parses the given timestamp string from ISO format to datetime.datetime
|
||||
@@ -31,3 +34,104 @@ def parse_duration(duration: Any) -> datetime.timedelta:
|
||||
def get_testdata_file_path(file: str) -> str:
|
||||
testdata_dir = os.path.join(os.path.dirname(__file__), "..", "testdata")
|
||||
return os.path.join(testdata_dir, file)
|
||||
|
||||
|
||||
def get_user_by_email(signoz: types.SigNoz, admin_token: str, email: str) -> dict:
|
||||
"""Helper to get a user by email."""
|
||||
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers=headers,
|
||||
)
|
||||
return next(
|
||||
(user for user in response.json()["data"] if user["email"] == email),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def get_user_role_names(signoz: types.SigNoz, admin_token: str, user_id: str) -> list:
|
||||
"""Helper to get the user roles by user ID"""
|
||||
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
|
||||
timeout=2,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
roles = response.json()["data"]
|
||||
if not roles:
|
||||
return []
|
||||
|
||||
return [role["name"] for role in roles]
|
||||
|
||||
|
||||
def get_user_roles(signoz: types.SigNoz, admin_token: str, user_id: str) -> list:
|
||||
"""Helper to get the user roles (full objects) by user ID"""
|
||||
headers = {"Authorization": f"Bearer {admin_token}"} if admin_token else {}
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
|
||||
timeout=2,
|
||||
headers=headers,
|
||||
)
|
||||
return response.json()["data"] or []
|
||||
|
||||
|
||||
def add_user_role(
|
||||
signoz: types.SigNoz, admin_token: str, user_id: str, role_name: str
|
||||
) -> None:
|
||||
"""Helper to add a role to a user via POST /api/v2/users/{id}/roles"""
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
|
||||
json={"name": role_name},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert (
|
||||
response.status_code == 200
|
||||
), f"failed to add role {role_name}: {response.text}"
|
||||
|
||||
|
||||
def remove_user_role_by_name(
|
||||
signoz: types.SigNoz, admin_token: str, user_id: str, role_name: str
|
||||
) -> None:
|
||||
"""Helper to remove a role from a user by role name"""
|
||||
roles = get_user_roles(signoz, admin_token, user_id)
|
||||
role_id = next((r["id"] for r in roles if r["name"] == role_name), None)
|
||||
assert role_id is not None, f"role {role_name} not found for user {user_id}"
|
||||
response = requests.delete(
|
||||
signoz.self.host_configs["8080"].get(
|
||||
f"/api/v2/users/{user_id}/roles/{role_id}"
|
||||
),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert (
|
||||
response.status_code == 204
|
||||
), f"failed to remove role {role_name}: {response.text}"
|
||||
|
||||
|
||||
def set_user_roles(
|
||||
signoz: types.SigNoz, admin_token: str, user_id: str, desired_role_names: list
|
||||
) -> None:
|
||||
"""Helper to set exact roles for a user using POST/DELETE endpoints"""
|
||||
current_roles = get_user_roles(signoz, admin_token, user_id)
|
||||
current_names = {r["name"] for r in current_roles}
|
||||
desired_names = set(desired_role_names)
|
||||
|
||||
# Remove roles not in desired set
|
||||
for role in current_roles:
|
||||
if role["name"] not in desired_names:
|
||||
response = requests.delete(
|
||||
signoz.self.host_configs["8080"].get(
|
||||
f"/api/v2/users/{user_id}/roles/{role['id']}"
|
||||
),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == 204
|
||||
|
||||
# Add roles not in current set
|
||||
for name in desired_names:
|
||||
if name not in current_names:
|
||||
add_user_role(signoz, admin_token, user_id, name)
|
||||
|
||||
@@ -12,11 +12,8 @@ from fixtures.auth import (
|
||||
USER_ADMIN_PASSWORD,
|
||||
add_license,
|
||||
)
|
||||
from fixtures.idputils import (
|
||||
get_saml_domain,
|
||||
get_user_by_email,
|
||||
perform_saml_login,
|
||||
)
|
||||
from fixtures.idputils import get_saml_domain, perform_saml_login
|
||||
from fixtures.utils import get_user_by_email, get_user_role_names
|
||||
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
|
||||
|
||||
|
||||
@@ -131,26 +128,12 @@ def test_saml_authn(
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Assert that the user was created in signoz.
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "viewer@saml.integration.test"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
found_user = get_user_by_email(signoz, admin_token, "viewer@saml.integration.test")
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
# Confirm role
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_idp_initiated_saml_authn(
|
||||
@@ -182,26 +165,14 @@ def test_idp_initiated_saml_authn(
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Assert that the user was created in signoz.
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
found_user = get_user_by_email(
|
||||
signoz, admin_token, "viewer.idp.initiated@saml.integration.test"
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "viewer.idp.initiated@saml.integration.test"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
# Confirm role
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_update_domain_with_group_mappings(
|
||||
@@ -271,7 +242,8 @@ def test_saml_role_mapping_single_group_admin(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_single_group_editor(
|
||||
@@ -297,7 +269,8 @@ def test_saml_role_mapping_single_group_editor(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_multiple_groups_highest_wins(
|
||||
@@ -327,7 +300,8 @@ def test_saml_role_mapping_multiple_groups_highest_wins(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_explicit_viewer_group(
|
||||
@@ -354,7 +328,8 @@ def test_saml_role_mapping_explicit_viewer_group(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_unmapped_group_uses_default(
|
||||
@@ -380,7 +355,8 @@ def test_saml_role_mapping_unmapped_group_uses_default(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_update_domain_with_use_role_claim(
|
||||
@@ -457,7 +433,8 @@ def test_saml_role_mapping_role_claim_takes_precedence(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_invalid_role_claim_fallback(
|
||||
@@ -487,7 +464,8 @@ def test_saml_role_mapping_invalid_role_claim_fallback(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_role_mapping_case_insensitive(
|
||||
@@ -517,7 +495,8 @@ def test_saml_role_mapping_case_insensitive(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_name_mapping(
|
||||
@@ -545,7 +524,8 @@ def test_saml_name_mapping(
|
||||
assert (
|
||||
found_user["displayName"] == "Jane"
|
||||
) # We are only mapping the first name here
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_empty_name_fallback(
|
||||
@@ -570,7 +550,8 @@ def test_saml_empty_name_fallback(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_sso_login_activates_pending_invite_user(
|
||||
@@ -613,7 +594,8 @@ def test_saml_sso_login_activates_pending_invite_user(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
assert found_user is not None
|
||||
assert found_user["status"] == "active"
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_saml_sso_deleted_user_gets_new_user_on_login(
|
||||
@@ -680,7 +662,7 @@ def test_saml_sso_deleted_user_gets_new_user_on_login(
|
||||
|
||||
# Verify a NEW active user was auto-provisioned via SSO
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
@@ -694,4 +676,7 @@ def test_saml_sso_deleted_user_gets_new_user_on_login(
|
||||
)
|
||||
assert found_user is not None
|
||||
assert found_user["status"] == "active"
|
||||
assert found_user["role"] == "VIEWER" # default role from SSO domain config
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert (
|
||||
"signoz-viewer" in found_user_role_names
|
||||
) # default role from SSO domain config
|
||||
|
||||
@@ -11,11 +11,8 @@ from fixtures.auth import (
|
||||
USER_ADMIN_PASSWORD,
|
||||
add_license,
|
||||
)
|
||||
from fixtures.idputils import (
|
||||
get_oidc_domain,
|
||||
get_user_by_email,
|
||||
perform_oidc_login,
|
||||
)
|
||||
from fixtures.idputils import get_oidc_domain, perform_oidc_login
|
||||
from fixtures.utils import get_user_by_email, get_user_role_names
|
||||
from fixtures.types import Operation, SigNoz, TestContainerDocker, TestContainerIDP
|
||||
|
||||
|
||||
@@ -112,26 +109,12 @@ def test_oidc_authn(
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Assert that the user was created in signoz.
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "viewer@oidc.integration.test"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
found_user = get_user_by_email(signoz, admin_token, "viewer@oidc.integration.test")
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
# Confirm role
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_update_domain_with_group_mappings(
|
||||
@@ -208,7 +191,8 @@ def test_oidc_role_mapping_single_group_admin(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_single_group_editor(
|
||||
@@ -234,7 +218,8 @@ def test_oidc_role_mapping_single_group_editor(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_multiple_groups_highest_wins(
|
||||
@@ -264,7 +249,8 @@ def test_oidc_role_mapping_multiple_groups_highest_wins(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_explicit_viewer_group(
|
||||
@@ -291,7 +277,8 @@ def test_oidc_role_mapping_explicit_viewer_group(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_unmapped_group_uses_default(
|
||||
@@ -317,7 +304,8 @@ def test_oidc_role_mapping_unmapped_group_uses_default(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_update_domain_with_use_role_claim(
|
||||
@@ -397,7 +385,8 @@ def test_oidc_role_mapping_role_claim_takes_precedence(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_invalid_role_claim_fallback(
|
||||
@@ -429,7 +418,8 @@ def test_oidc_role_mapping_invalid_role_claim_fallback(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_role_mapping_case_insensitive(
|
||||
@@ -459,7 +449,8 @@ def test_oidc_role_mapping_case_insensitive(
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
|
||||
def test_oidc_name_mapping(
|
||||
@@ -482,20 +473,13 @@ def test_oidc_name_mapping(
|
||||
)
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
users = response.json()["data"]
|
||||
found_user = next((u for u in users if u["email"] == email), None)
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
# Keycloak concatenates firstName + lastName into "name" claim
|
||||
assert found_user["displayName"] == "John Doe"
|
||||
assert found_user["role"] == "VIEWER" # Default role
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names # Default role
|
||||
|
||||
|
||||
def test_oidc_empty_name_uses_fallback(
|
||||
@@ -518,19 +502,12 @@ def test_oidc_empty_name_uses_fallback(
|
||||
)
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
users = response.json()["data"]
|
||||
found_user = next((u for u in users if u["email"] == email), None)
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
# User should still be created even with empty name
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
# Note: displayName may be empty - this is a known limitation
|
||||
|
||||
|
||||
@@ -570,16 +547,9 @@ def test_oidc_sso_login_activates_pending_invite_user(
|
||||
signoz, idp, driver, get_session_context, idp_login, email, "password123"
|
||||
)
|
||||
|
||||
# User should be active with ADMIN role from invite, not VIEWER from SSO
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
found_user = next(
|
||||
(user for user in response.json()["data"] if user["email"] == email),
|
||||
None,
|
||||
)
|
||||
# User should be active with VIEWER role from SSO
|
||||
found_user = get_user_by_email(signoz, admin_token, email)
|
||||
assert found_user is not None
|
||||
assert found_user["status"] == "active"
|
||||
assert found_user["role"] == "VIEWER"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-viewer" in found_user_role_names
|
||||
|
||||
@@ -5,6 +5,7 @@ import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.logger import setup_logger
|
||||
from fixtures.utils import get_user_by_email, get_user_role_names
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
@@ -74,31 +75,10 @@ def test_register(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
|
||||
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "admin@integration.test"),
|
||||
None,
|
||||
)
|
||||
|
||||
found_user = get_user_by_email(signoz, admin_token, "admin@integration.test")
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user["id"]}"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["data"]["role"] == "ADMIN"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-admin" in found_user_role_names
|
||||
|
||||
|
||||
def test_invite_and_register(
|
||||
@@ -120,21 +100,11 @@ def test_invite_and_register(
|
||||
assert invited_user["role"] == "EDITOR"
|
||||
|
||||
# Verify the user user appears in the users list but as pending_invite status
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "editor@integration.test"),
|
||||
None,
|
||||
)
|
||||
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
|
||||
assert found_user is not None
|
||||
assert found_user["status"] == "pending_invite"
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
reset_token = invited_user["token"]
|
||||
|
||||
@@ -152,7 +122,7 @@ def test_invite_and_register(
|
||||
|
||||
# Verify that an admin endpoint cannot be called by the editor user
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {editor_token}"},
|
||||
)
|
||||
@@ -160,24 +130,12 @@ def test_invite_and_register(
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
# Verify that the editor user status has been updated to ACTIVE
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={
|
||||
"Authorization": f"Bearer {get_token("admin@integration.test", "password123Z$")}"
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "editor@integration.test"),
|
||||
None,
|
||||
)
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
assert found_user["displayName"] == "editor"
|
||||
assert found_user["email"] == "editor@integration.test"
|
||||
assert found_user["status"] == "active"
|
||||
@@ -221,25 +179,7 @@ def test_self_access(
|
||||
) -> None:
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "editor@integration.test"),
|
||||
None,
|
||||
)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/user/{found_user['id']}"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["data"]["role"] == "EDITOR"
|
||||
found_user = get_user_by_email(signoz, admin_token, "editor@integration.test")
|
||||
assert found_user is not None
|
||||
found_user_role_names = get_user_role_names(signoz, admin_token, found_user["id"])
|
||||
assert "signoz-editor" in found_user_role_names
|
||||
|
||||
@@ -26,7 +26,7 @@ def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
|
||||
assert "token" in pat_response["data"]
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
|
||||
)
|
||||
@@ -85,7 +85,7 @@ def test_api_key_role(
|
||||
assert "token" in pat_response["data"]
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
|
||||
)
|
||||
@@ -109,7 +109,7 @@ def test_api_key_role(
|
||||
assert "token" in pat_response["data"]
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
|
||||
)
|
||||
|
||||
@@ -6,6 +6,7 @@ from sqlalchemy import sql
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.logger import setup_logger
|
||||
from fixtures.utils import get_user_by_email
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
@@ -35,23 +36,10 @@ def test_change_password(
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Get the user id
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "admin+password@integration.test"
|
||||
),
|
||||
None,
|
||||
found_user = get_user_by_email(
|
||||
signoz, admin_token, "admin+password@integration.test"
|
||||
)
|
||||
assert found_user is not None
|
||||
|
||||
# Try logging in with the password
|
||||
token = get_token("admin+password@integration.test", "password123Z$")
|
||||
@@ -100,23 +88,10 @@ def test_reset_password(
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
# Get the user id for admin+password@integration.test
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "admin+password@integration.test"
|
||||
),
|
||||
None,
|
||||
found_user = get_user_by_email(
|
||||
signoz, admin_token, "admin+password@integration.test"
|
||||
)
|
||||
assert found_user is not None
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(
|
||||
@@ -158,23 +133,10 @@ def test_reset_password_with_no_password(
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
# Get the user id for admin+password@integration.test
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(
|
||||
user
|
||||
for user in user_response
|
||||
if user["email"] == "admin+password@integration.test"
|
||||
),
|
||||
None,
|
||||
found_user = get_user_by_email(
|
||||
signoz, admin_token, "admin+password@integration.test"
|
||||
)
|
||||
assert found_user is not None
|
||||
|
||||
with signoz.sqlstore.conn.connect() as conn:
|
||||
result = conn.execute(
|
||||
@@ -305,17 +267,7 @@ def test_forgot_password_creates_reset_token(
|
||||
|
||||
# Verify reset password token was created by querying the database
|
||||
# First, get the user ID
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "forgot@integration.test"),
|
||||
None,
|
||||
)
|
||||
found_user = get_user_by_email(signoz, admin_token, "forgot@integration.test")
|
||||
assert found_user is not None
|
||||
|
||||
reset_token = None
|
||||
@@ -371,17 +323,7 @@ def test_reset_password_with_expired_token(
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
# Get user ID for the forgot@integration.test user
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
user_response = response.json()["data"]
|
||||
found_user = next(
|
||||
(user for user in user_response if user["email"] == "forgot@integration.test"),
|
||||
None,
|
||||
)
|
||||
found_user = get_user_by_email(signoz, admin_token, "forgot@integration.test")
|
||||
assert found_user is not None
|
||||
|
||||
# Get org ID
|
||||
|
||||
@@ -4,6 +4,12 @@ from typing import Callable, Tuple
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.utils import (
|
||||
add_user_role,
|
||||
get_user_role_names,
|
||||
remove_user_role_by_name,
|
||||
set_user_roles,
|
||||
)
|
||||
|
||||
|
||||
def test_change_role(
|
||||
@@ -40,7 +46,7 @@ def test_change_role(
|
||||
)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
@@ -58,22 +64,13 @@ def test_change_role(
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
# Change the new user's role - move to ADMIN
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/user/{new_user_id}"),
|
||||
json={
|
||||
"displayName": "role change user",
|
||||
"role": "ADMIN",
|
||||
},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
# Change the new user's role - add ADMIN, remove VIEWER
|
||||
add_user_role(signoz, admin_token, new_user_id, "signoz-admin")
|
||||
remove_user_role_by_name(signoz, admin_token, new_user_id, "signoz-viewer")
|
||||
|
||||
# Make some API calls again
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
@@ -106,3 +103,268 @@ def test_change_role(
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
|
||||
def test_remove_all_roles(
|
||||
signoz: types.SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_tokens: Callable[[str, str], Tuple[str, str]],
|
||||
):
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
# Create a new user as EDITOR
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/invite"),
|
||||
json={"email": "admin+noroles@integration.test", "role": "EDITOR"},
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
|
||||
invited_user = response.json()["data"]
|
||||
reset_token = invited_user["token"]
|
||||
|
||||
# Activate user via reset password
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
|
||||
json={"password": "password123Z$", "token": reset_token},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Login and get user id
|
||||
new_user_token, new_user_refresh_token = get_tokens(
|
||||
"admin+noroles@integration.test", "password123Z$"
|
||||
)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
new_user_id = response.json()["data"]["id"]
|
||||
|
||||
# Validate the user has the editor role
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert role_names is not None
|
||||
assert "signoz-editor" in role_names
|
||||
|
||||
# Remove all roles via DELETE endpoint
|
||||
set_user_roles(signoz, admin_token, new_user_id, [])
|
||||
|
||||
# Validate the user has no roles
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert role_names is None or len(role_names) == 0
|
||||
|
||||
# Old token should be invalidated after role change
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
# Token rotation should also fail for a user with no roles
|
||||
# (the session endpoint requires roles to build an identity)
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
|
||||
json={
|
||||
"refreshToken": new_user_refresh_token,
|
||||
},
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert (
|
||||
response.status_code != HTTPStatus.OK
|
||||
), "token rotation should fail for user with no roles"
|
||||
|
||||
|
||||
def test_multiple_roles(
|
||||
signoz: types.SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_tokens: Callable[[str, str], Tuple[str, str]],
|
||||
):
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
# Create a new user as VIEWER
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/invite"),
|
||||
json={"email": "admin+multirole@integration.test", "role": "VIEWER"},
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
|
||||
invited_user = response.json()["data"]
|
||||
reset_token = invited_user["token"]
|
||||
|
||||
# Activate user via reset password
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
|
||||
json={"password": "password123Z$", "token": reset_token},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Login and get user id
|
||||
new_user_token, new_user_refresh_token = get_tokens(
|
||||
"admin+multirole@integration.test", "password123Z$"
|
||||
)
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
new_user_id = response.json()["data"]["id"]
|
||||
|
||||
# Validate user starts with viewer role
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert role_names is not None
|
||||
assert role_names == [
|
||||
"signoz-viewer"
|
||||
], f"expected ['signoz-viewer'], got {role_names}"
|
||||
|
||||
# As viewer, admin-only APIs should be forbidden
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
# Assign multiple roles: add editor (viewer already assigned)
|
||||
add_user_role(signoz, admin_token, new_user_id, "signoz-editor")
|
||||
|
||||
# Validate user has both roles
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert role_names is not None
|
||||
assert sorted(role_names) == [
|
||||
"signoz-editor",
|
||||
"signoz-viewer",
|
||||
], f"expected ['signoz-editor', 'signoz-viewer'], got {sorted(role_names)}"
|
||||
|
||||
# Rotate token to pick up new roles
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
|
||||
json={"refreshToken": new_user_refresh_token},
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rotate_response = response.json()["data"]
|
||||
new_user_token = rotate_response["accessToken"]
|
||||
new_user_refresh_token = rotate_response["refreshToken"]
|
||||
|
||||
# Verify /me includes both roles
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
me_role_names = sorted(
|
||||
ur["role"]["name"] for ur in response.json()["data"]["userRoles"]
|
||||
)
|
||||
assert me_role_names == [
|
||||
"signoz-editor",
|
||||
"signoz-viewer",
|
||||
], f"expected ['signoz-editor', 'signoz-viewer'] in /me, got {me_role_names}"
|
||||
|
||||
# Editor+viewer still cannot access admin-only APIs
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
# Add admin role (editor + viewer already assigned)
|
||||
add_user_role(signoz, admin_token, new_user_id, "signoz-admin")
|
||||
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert sorted(role_names) == [
|
||||
"signoz-admin",
|
||||
"signoz-editor",
|
||||
"signoz-viewer",
|
||||
], f"expected all three roles, got {sorted(role_names)}"
|
||||
|
||||
# Rotate token to pick up admin role
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
|
||||
json={"refreshToken": new_user_refresh_token},
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rotate_response = response.json()["data"]
|
||||
new_user_token = rotate_response["accessToken"]
|
||||
new_user_refresh_token = rotate_response["refreshToken"]
|
||||
|
||||
# Now with admin role, admin-only APIs should succeed
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
# Reduce back to single viewer role
|
||||
set_user_roles(signoz, admin_token, new_user_id, ["signoz-viewer"])
|
||||
|
||||
role_names = get_user_role_names(signoz, admin_token, new_user_id)
|
||||
assert role_names == [
|
||||
"signoz-viewer"
|
||||
], f"expected ['signoz-viewer'] after reduction, got {role_names}"
|
||||
|
||||
# Rotate token to pick up reduced roles
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/sessions/rotate"),
|
||||
json={"refreshToken": new_user_refresh_token},
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rotate_response = response.json()["data"]
|
||||
new_user_token = rotate_response["accessToken"]
|
||||
|
||||
# After reducing to viewer, admin-only APIs should be forbidden again
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/org/preferences"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {new_user_token}"},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
@@ -52,7 +52,7 @@ def test_root_user_signoz_admin_assignment(
|
||||
|
||||
# Get the user from the /user/me endpoint and extract the id
|
||||
user_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
@@ -47,7 +47,7 @@ def test_user_invite_accept_role_grant(
|
||||
# Login with editor email and password
|
||||
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
|
||||
user_me_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {editor_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
@@ -102,7 +102,7 @@ def test_user_update_role_grant(
|
||||
# Get the editor user's id
|
||||
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
|
||||
user_me_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {editor_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
@@ -120,15 +120,37 @@ def test_user_update_role_grant(
|
||||
roles_data = roles_response.json()["data"]
|
||||
org_id = roles_data[0]["orgId"]
|
||||
|
||||
# Update the user's role to viewer
|
||||
update_payload = {"role": "VIEWER"}
|
||||
update_response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/user/{editor_id}"),
|
||||
json=update_payload,
|
||||
# Add the viewer role to the user
|
||||
add_response = requests.post(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{editor_id}/roles"),
|
||||
json={"name": "signoz-viewer"},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert update_response.status_code == HTTPStatus.OK
|
||||
assert add_response.status_code == HTTPStatus.OK
|
||||
|
||||
# Get the editor role id so we can remove it
|
||||
roles_list_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{editor_id}/roles"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert roles_list_response.status_code == HTTPStatus.OK
|
||||
editor_role_id = next(
|
||||
r["id"]
|
||||
for r in roles_list_response.json()["data"]
|
||||
if r["name"] == "signoz-editor"
|
||||
)
|
||||
|
||||
# Remove the editor role
|
||||
remove_response = requests.delete(
|
||||
signoz.self.host_configs["8080"].get(
|
||||
f"/api/v2/users/{editor_id}/roles/{editor_role_id}"
|
||||
),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert remove_response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Check that user no longer has the editor role in the db
|
||||
with signoz.sqlstore.conn.connect() as conn:
|
||||
@@ -179,7 +201,7 @@ def test_user_delete_role_revoke(
|
||||
# login with editor to get the user_id and check if user exists
|
||||
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
|
||||
user_me_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user/me"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {editor_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
@@ -222,3 +244,120 @@ def test_user_delete_role_revoke(
|
||||
else:
|
||||
_user = f"user:organization/{org_id}/user/{editor_id}"
|
||||
assert row["_user"] != _user
|
||||
|
||||
|
||||
def test_update_my_user(
|
||||
signoz: SigNoz,
|
||||
create_user_admin: Operation, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
):
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Invite a viewer user
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/invite"),
|
||||
json={"email": "admin+updateme@integration.test", "role": "VIEWER"},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
reset_token = response.json()["data"]["token"]
|
||||
|
||||
# Activate user
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
|
||||
json={"password": "password123Z$", "token": reset_token},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Login as viewer
|
||||
viewer_token = get_token("admin+updateme@integration.test", "password123Z$")
|
||||
|
||||
# Update own display name via PUT /api/v2/users/me
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
json={"displayName": "viewer updated name"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Verify the update via GET /api/v2/users/me
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["data"]["displayName"] == "viewer updated name"
|
||||
|
||||
|
||||
def test_update_user_by_id(
|
||||
signoz: SigNoz,
|
||||
create_user_admin: Operation, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
):
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Invite a user to update
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/invite"),
|
||||
json={"email": "admin+updatetest@integration.test", "role": "VIEWER"},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
reset_token = response.json()["data"]["token"]
|
||||
|
||||
# Activate user
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
|
||||
json={"password": "password123Z$", "token": reset_token},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Get user id
|
||||
user_token = get_token("admin+updatetest@integration.test", "password123Z$")
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {user_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
user_id = response.json()["data"]["id"]
|
||||
|
||||
# Admin updates user's display name via PUT /api/v2/users/{id}
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}"),
|
||||
json={"displayName": "renamed user"},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
# Verify via GET /api/v2/users/{id}
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["data"]["displayName"] == "renamed user"
|
||||
|
||||
# Self-update should be rejected
|
||||
me_response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
admin_id = me_response.json()["data"]["id"]
|
||||
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v2/users/{admin_id}"),
|
||||
json={"displayName": "self update"},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.BAD_REQUEST
|
||||
|
||||
@@ -14,7 +14,7 @@ def test_root_user_created(signoz: types.SigNoz) -> None:
|
||||
The root user service reconciles asynchronously after startup.
|
||||
|
||||
Phase 1: Poll /api/v1/version until setupCompleted=true.
|
||||
Phase 2: Poll /api/v1/user until it returns 200, confirming the root
|
||||
Phase 2: Poll /api/v2/users until it returns 200, confirming the root
|
||||
user actually exists and the impersonation provider works.
|
||||
"""
|
||||
# Phase 1: wait for setupCompleted
|
||||
@@ -39,13 +39,13 @@ def test_root_user_created(signoz: types.SigNoz) -> None:
|
||||
# Phase 2: wait for root user to be fully resolved
|
||||
for attempt in range(15):
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
)
|
||||
if response.status_code == HTTPStatus.OK:
|
||||
return
|
||||
logger.info(
|
||||
"Attempt %s: /api/v1/user returned %s, retrying ...",
|
||||
"Attempt %s: /api/v2/users returned %s, retrying ...",
|
||||
attempt + 1,
|
||||
response.status_code,
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@ import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.logger import setup_logger
|
||||
from fixtures.utils import get_user_role_names
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
@@ -32,7 +33,7 @@ def test_impersonated_user_is_admin(signoz: types.SigNoz) -> None:
|
||||
Listing users is an admin-only endpoint.
|
||||
"""
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
signoz.self.host_configs["8080"].get("/api/v2/users"),
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
@@ -46,4 +47,5 @@ def test_impersonated_user_is_admin(signoz: types.SigNoz) -> None:
|
||||
None,
|
||||
)
|
||||
assert root_user is not None
|
||||
assert root_user["role"] == "ADMIN"
|
||||
root_user_role_names = get_user_role_names(signoz, None, root_user["id"])
|
||||
assert "signoz-admin" in root_user_role_names
|
||||
|
||||
Reference in New Issue
Block a user