Compare commits

...

2 Commits

Author SHA1 Message Date
vikrantgupta25
a458f35d65 test(authz): rework role integration tests onto the new CRUD APIs
Migrate the role integration suite off the deprecated PATCH endpoints and
onto the current declarative role CRUD APIs (Create/Get/List/Update/Delete
with full transactionGroups).

- role/01_register.py: verify managed roles via GetRole's transactionGroups
  against a golden matrix in testdata/role/managed_role_grants.json (no more
  DB tuple assertions).
- role/02_crud.py (new): custom-role CRUD lifecycle, declarative update,
  validation (naming, invalid verb/type/kind/selector, duplicate, managed
  immutability, delete-with-assignee), and license gating.
- role/03_fga.py: resource FGA allow/deny via declarative grant sets.
- role/02_user.py: deleted; user role-membership is covered by the
  passwordauthn suite.
- serviceaccount/06_fga.py: migrated to declarative grant PUTs.
- fixtures/role.py: pure data helpers + find_role_id fixture; tests make
  their HTTP calls directly.
2026-07-01 16:40:23 +05:30
vikrantgupta25
0045c675d4 chore(authz): delete the deprecated authz apis 2026-07-01 15:41:04 +05:30
17 changed files with 1292 additions and 1911 deletions

View File

@@ -618,13 +618,6 @@ components:
provider:
$ref: '#/components/schemas/AuthtypesAuthNProvider'
type: object
AuthtypesPatchableRole:
properties:
description:
type: string
required:
- description
type: object
AuthtypesPostableAuthDomain:
properties:
config:
@@ -2536,22 +2529,6 @@ components:
- resource
- selectors
type: object
CoretypesPatchableObjects:
properties:
additions:
items:
$ref: '#/components/schemas/CoretypesObjectGroup'
nullable: true
type: array
deletions:
items:
$ref: '#/components/schemas/CoretypesObjectGroup'
nullable: true
type: array
required:
- additions
- deletions
type: object
CoretypesResourceRef:
properties:
kind:
@@ -11825,68 +11802,6 @@ paths:
summary: Get role
tags:
- role
patch:
deprecated: true
description: This endpoint patches a role
operationId: PatchRole
parameters:
- in: path
name: id
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/AuthtypesPatchableRole'
responses:
"204":
description: No Content
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"451":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unavailable For Legal Reasons
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Implemented
security:
- api_key:
- role:update
- tokenizer:
- role:update
summary: Patch role
tags:
- role
put:
deprecated: false
description: This endpoint updates a role
@@ -11949,158 +11864,6 @@ paths:
summary: Update role
tags:
- role
/api/v1/roles/{id}/relations/{relation}/objects:
get:
deprecated: false
description: Gets all objects connected to the specified role via a given relation
type
operationId: GetObjects
parameters:
- in: path
name: id
required: true
schema:
type: string
- in: path
name: relation
required: true
schema:
type: string
responses:
"200":
content:
application/json:
schema:
properties:
data:
items:
$ref: '#/components/schemas/CoretypesObjectGroup'
type: array
status:
type: string
required:
- status
- data
type: object
description: OK
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"451":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unavailable For Legal Reasons
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Implemented
security:
- api_key:
- role:read
- tokenizer:
- role:read
summary: Get objects for a role by relation
tags:
- role
patch:
deprecated: true
description: Patches the objects connected to the specified role via a given
relation type
operationId: PatchObjects
parameters:
- in: path
name: id
required: true
schema:
type: string
- in: path
name: relation
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CoretypesPatchableObjects'
responses:
"204":
description: No Content
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"451":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unavailable For Legal Reasons
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Implemented
security:
- api_key:
- role:update
- tokenizer:
- role:update
summary: Patch objects for a role by relation
tags:
- role
/api/v1/route_policies:
get:
deprecated: false

View File

@@ -260,40 +260,6 @@ func (provider *provider) GetWithTransactionGroups(ctx context.Context, orgID va
return authtypes.MakeRoleWithTransactionGroups(role, transactionGroups), nil
}
func (provider *provider) GetObjects(ctx context.Context, orgID valuer.UUID, id valuer.UUID, relation authtypes.Relation) ([]*coretypes.Object, error) {
_, err := provider.licensing.GetActive(ctx, orgID)
if err != nil {
return nil, errors.New(errors.TypeLicenseUnavailable, errors.CodeLicenseUnavailable, "a valid license is not available").WithAdditional("this feature requires a valid license").WithAdditional(err.Error())
}
storableRole, err := provider.store.Get(ctx, orgID, id)
if err != nil {
return nil, err
}
objects := make([]*coretypes.Object, 0)
for _, objectType := range provider.registry.Types() {
if coretypes.ErrIfVerbNotValidForType(relation.Verb, objectType) != nil {
continue
}
resourceObjects, err := provider.
ListObjects(
ctx,
authtypes.MustNewSubject(coretypes.NewResourceRole(), storableRole.Name, orgID, &coretypes.VerbAssignee),
relation,
objectType,
)
if err != nil {
return nil, err
}
objects = append(objects, resourceObjects...)
}
return objects, nil
}
func (provider *provider) Update(ctx context.Context, orgID valuer.UUID, updatedRole *authtypes.RoleWithTransactionGroups) error {
_, err := provider.licensing.GetActive(ctx, orgID)
if err != nil {
@@ -324,39 +290,6 @@ func (provider *provider) Update(ctx context.Context, orgID valuer.UUID, updated
return provider.store.Update(ctx, orgID, updatedRole.Role)
}
func (provider *provider) Patch(ctx context.Context, orgID valuer.UUID, role *authtypes.Role) error {
_, err := provider.licensing.GetActive(ctx, orgID)
if err != nil {
return errors.New(errors.TypeLicenseUnavailable, errors.CodeLicenseUnavailable, "a valid license is not available").WithAdditional("this feature requires a valid license").WithAdditional(err.Error())
}
return provider.store.Update(ctx, orgID, role)
}
func (provider *provider) PatchObjects(ctx context.Context, orgID valuer.UUID, name string, relation authtypes.Relation, additions, deletions []*coretypes.Object) error {
_, err := provider.licensing.GetActive(ctx, orgID)
if err != nil {
return errors.New(errors.TypeLicenseUnavailable, errors.CodeLicenseUnavailable, "a valid license is not available").WithAdditional("this feature requires a valid license").WithAdditional(err.Error())
}
additionTuples, err := authtypes.GetAdditionTuples(name, orgID, relation, additions)
if err != nil {
return err
}
deletionTuples, err := authtypes.GetDeletionTuples(name, orgID, relation, deletions)
if err != nil {
return err
}
err = provider.Write(ctx, additionTuples, deletionTuples)
if err != nil {
return err
}
return nil
}
func (provider *provider) Delete(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error {
_, err := provider.licensing.GetActive(ctx, orgID)
if err != nil {

View File

@@ -18,19 +18,13 @@ import type {
} from 'react-query';
import type {
AuthtypesPatchableRoleDTO,
AuthtypesPostableRoleDTO,
AuthtypesUpdatableRoleDTO,
CoretypesPatchableObjectsDTO,
CreateRole201,
DeleteRolePathParameters,
GetObjects200,
GetObjectsPathParameters,
GetRole200,
GetRolePathParameters,
ListRoles200,
PatchObjectsPathParameters,
PatchRolePathParameters,
RenderErrorResponseDTO,
UpdateRolePathParameters,
} from '../sigNoz.schemas';
@@ -365,107 +359,6 @@ export const invalidateGetRole = async (
return queryClient;
};
/**
* This endpoint patches a role
* @deprecated
* @summary Patch role
*/
export const patchRole = (
{ id }: PatchRolePathParameters,
authtypesPatchableRoleDTO?: BodyType<AuthtypesPatchableRoleDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/roles/${id}`,
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
data: authtypesPatchableRoleDTO,
signal,
});
};
export const getPatchRoleMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof patchRole>>,
TError,
{
pathParams: PatchRolePathParameters;
data?: BodyType<AuthtypesPatchableRoleDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof patchRole>>,
TError,
{
pathParams: PatchRolePathParameters;
data?: BodyType<AuthtypesPatchableRoleDTO>;
},
TContext
> => {
const mutationKey = ['patchRole'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof patchRole>>,
{
pathParams: PatchRolePathParameters;
data?: BodyType<AuthtypesPatchableRoleDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return patchRole(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type PatchRoleMutationResult = NonNullable<
Awaited<ReturnType<typeof patchRole>>
>;
export type PatchRoleMutationBody =
| BodyType<AuthtypesPatchableRoleDTO>
| undefined;
export type PatchRoleMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @deprecated
* @summary Patch role
*/
export const usePatchRole = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof patchRole>>,
TError,
{
pathParams: PatchRolePathParameters;
data?: BodyType<AuthtypesPatchableRoleDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof patchRole>>,
TError,
{
pathParams: PatchRolePathParameters;
data?: BodyType<AuthtypesPatchableRoleDTO>;
},
TContext
> => {
return useMutation(getPatchRoleMutationOptions(options));
};
/**
* This endpoint updates a role
* @summary Update role
@@ -565,205 +458,3 @@ export const useUpdateRole = <
> => {
return useMutation(getUpdateRoleMutationOptions(options));
};
/**
* Gets all objects connected to the specified role via a given relation type
* @summary Get objects for a role by relation
*/
export const getObjects = (
{ id, relation }: GetObjectsPathParameters,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<GetObjects200>({
url: `/api/v1/roles/${id}/relations/${relation}/objects`,
method: 'GET',
signal,
});
};
export const getGetObjectsQueryKey = ({
id,
relation,
}: GetObjectsPathParameters) => {
return [`/api/v1/roles/${id}/relations/${relation}/objects`] as const;
};
export const getGetObjectsQueryOptions = <
TData = Awaited<ReturnType<typeof getObjects>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
{ id, relation }: GetObjectsPathParameters,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof getObjects>>,
TError,
TData
>;
},
) => {
const { query: queryOptions } = options ?? {};
const queryKey =
queryOptions?.queryKey ?? getGetObjectsQueryKey({ id, relation });
const queryFn: QueryFunction<Awaited<ReturnType<typeof getObjects>>> = ({
signal,
}) => getObjects({ id, relation }, signal);
return {
queryKey,
queryFn,
enabled: !!(id && relation),
...queryOptions,
} as UseQueryOptions<Awaited<ReturnType<typeof getObjects>>, TError, TData> & {
queryKey: QueryKey;
};
};
export type GetObjectsQueryResult = NonNullable<
Awaited<ReturnType<typeof getObjects>>
>;
export type GetObjectsQueryError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get objects for a role by relation
*/
export function useGetObjects<
TData = Awaited<ReturnType<typeof getObjects>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
{ id, relation }: GetObjectsPathParameters,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof getObjects>>,
TError,
TData
>;
},
): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
const queryOptions = getGetObjectsQueryOptions({ id, relation }, options);
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
queryKey: QueryKey;
};
return { ...query, queryKey: queryOptions.queryKey };
}
/**
* @summary Get objects for a role by relation
*/
export const invalidateGetObjects = async (
queryClient: QueryClient,
{ id, relation }: GetObjectsPathParameters,
options?: InvalidateOptions,
): Promise<QueryClient> => {
await queryClient.invalidateQueries(
{ queryKey: getGetObjectsQueryKey({ id, relation }) },
options,
);
return queryClient;
};
/**
* Patches the objects connected to the specified role via a given relation type
* @deprecated
* @summary Patch objects for a role by relation
*/
export const patchObjects = (
{ id, relation }: PatchObjectsPathParameters,
coretypesPatchableObjectsDTO?: BodyType<CoretypesPatchableObjectsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/roles/${id}/relations/${relation}/objects`,
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
data: coretypesPatchableObjectsDTO,
signal,
});
};
export const getPatchObjectsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof patchObjects>>,
TError,
{
pathParams: PatchObjectsPathParameters;
data?: BodyType<CoretypesPatchableObjectsDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof patchObjects>>,
TError,
{
pathParams: PatchObjectsPathParameters;
data?: BodyType<CoretypesPatchableObjectsDTO>;
},
TContext
> => {
const mutationKey = ['patchObjects'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof patchObjects>>,
{
pathParams: PatchObjectsPathParameters;
data?: BodyType<CoretypesPatchableObjectsDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return patchObjects(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type PatchObjectsMutationResult = NonNullable<
Awaited<ReturnType<typeof patchObjects>>
>;
export type PatchObjectsMutationBody =
| BodyType<CoretypesPatchableObjectsDTO>
| undefined;
export type PatchObjectsMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @deprecated
* @summary Patch objects for a role by relation
*/
export const usePatchObjects = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof patchObjects>>,
TError,
{
pathParams: PatchObjectsPathParameters;
data?: BodyType<CoretypesPatchableObjectsDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof patchObjects>>,
TError,
{
pathParams: PatchObjectsPathParameters;
data?: BodyType<CoretypesPatchableObjectsDTO>;
},
TContext
> => {
return useMutation(getPatchObjectsMutationOptions(options));
};

View File

@@ -2230,13 +2230,6 @@ export interface AuthtypesOrgSessionContextDTO {
warning?: ErrorsJSONDTO;
}
export interface AuthtypesPatchableRoleDTO {
/**
* @type string
*/
description: string;
}
export interface AuthtypesPostableAuthDomainDTO {
config?: AuthtypesAuthDomainConfigDTO;
/**
@@ -3249,17 +3242,6 @@ export interface CommonJSONRefDTO {
$ref?: string;
}
export interface CoretypesPatchableObjectsDTO {
/**
* @type array,null
*/
additions: CoretypesObjectGroupDTO[] | null;
/**
* @type array,null
*/
deletions: CoretypesObjectGroupDTO[] | null;
}
export interface DashboardGridItemDTO {
content?: CommonJSONRefDTO;
/**
@@ -10248,31 +10230,9 @@ export type GetRole200 = {
status: string;
};
export type PatchRolePathParameters = {
id: string;
};
export type UpdateRolePathParameters = {
id: string;
};
export type GetObjectsPathParameters = {
id: string;
relation: string;
};
export type GetObjects200 = {
/**
* @type array
*/
data: CoretypesObjectGroupDTO[];
/**
* @type string
*/
status: string;
};
export type PatchObjectsPathParameters = {
id: string;
relation: string;
};
export type GetAllRoutePolicies200 = {
/**
* @type array

View File

@@ -145,86 +145,5 @@ func (provider *provider) addRoleRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/roles/{id}/relations/{relation}/objects", handler.New(
provider.authzMiddleware.CheckResources(provider.authzHandler.GetObjects, authtypes.SigNozAdminRoleName),
handler.OpenAPIDef{
ID: "GetObjects",
Tags: []string{"role"},
Summary: "Get objects for a role by relation",
Description: "Gets all objects connected to the specified role via a given relation type",
Request: nil,
RequestContentType: "",
Response: make([]*coretypes.ObjectGroup, 0),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusNotFound, http.StatusNotImplemented, http.StatusUnavailableForLegalReasons},
Deprecated: false,
SecuritySchemes: newScopedSecuritySchemes([]string{coretypes.ResourceRole.Scope(coretypes.VerbRead)}),
},
handler.WithResourceDefs(handler.BasicResourceDef{
Resource: coretypes.ResourceRole,
Verb: coretypes.VerbRead,
Category: coretypes.ActionCategoryAccessControl,
ID: coretypes.PathParam("id"),
Selector: provider.roleSelector,
}),
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/roles/{id}", handler.New(
provider.authzMiddleware.CheckResources(provider.authzHandler.Patch, authtypes.SigNozAdminRoleName),
handler.OpenAPIDef{
ID: "PatchRole",
Tags: []string{"role"},
Summary: "Patch role",
Description: "This endpoint patches a role",
Request: new(authtypes.PatchableRole),
RequestContentType: "",
Response: nil,
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound, http.StatusNotImplemented, http.StatusUnavailableForLegalReasons},
Deprecated: true,
SecuritySchemes: newScopedSecuritySchemes([]string{coretypes.ResourceRole.Scope(coretypes.VerbUpdate)}),
},
handler.WithResourceDefs(handler.BasicResourceDef{
Resource: coretypes.ResourceRole,
Verb: coretypes.VerbUpdate,
Category: coretypes.ActionCategoryAccessControl,
ID: coretypes.PathParam("id"),
Selector: provider.roleSelector,
}),
)).Methods(http.MethodPatch).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/roles/{id}/relations/{relation}/objects", handler.New(
provider.authzMiddleware.CheckResources(provider.authzHandler.PatchObjects, authtypes.SigNozAdminRoleName),
handler.OpenAPIDef{
ID: "PatchObjects",
Tags: []string{"role"},
Summary: "Patch objects for a role by relation",
Description: "Patches the objects connected to the specified role via a given relation type",
Request: new(coretypes.PatchableObjects),
RequestContentType: "",
Response: nil,
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound, http.StatusBadRequest, http.StatusNotImplemented, http.StatusUnavailableForLegalReasons},
Deprecated: true,
SecuritySchemes: newScopedSecuritySchemes([]string{coretypes.ResourceRole.Scope(coretypes.VerbUpdate)}),
},
handler.WithResourceDefs(handler.BasicResourceDef{
Resource: coretypes.ResourceRole,
Verb: coretypes.VerbUpdate,
Category: coretypes.ActionCategoryAccessControl,
ID: coretypes.PathParam("id"),
Selector: provider.roleSelector,
}),
)).Methods(http.MethodPatch).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -39,15 +39,6 @@ type AuthZ interface {
// Gets the role if it exists or creates one.
GetOrCreate(context.Context, valuer.UUID, *authtypes.Role) (*authtypes.Role, error)
// Gets the objects associated with the given role and relation.
GetObjects(context.Context, valuer.UUID, valuer.UUID, authtypes.Relation) ([]*coretypes.Object, error)
// Patches the role.
Patch(context.Context, valuer.UUID, *authtypes.Role) error
// Patches the objects in authorization server associated with the given role and relation
PatchObjects(context.Context, valuer.UUID, string, authtypes.Relation, []*coretypes.Object, []*coretypes.Object) error
// Updates the role's metadata and reconciles its transaction groups.
Update(context.Context, valuer.UUID, *authtypes.RoleWithTransactionGroups) error
@@ -102,14 +93,8 @@ type Handler interface {
Get(http.ResponseWriter, *http.Request)
GetObjects(http.ResponseWriter, *http.Request)
List(http.ResponseWriter, *http.Request)
Patch(http.ResponseWriter, *http.Request)
PatchObjects(http.ResponseWriter, *http.Request)
Update(http.ResponseWriter, *http.Request)
Check(http.ResponseWriter, *http.Request)

View File

@@ -189,22 +189,10 @@ func (provider *provider) GetOrCreate(_ context.Context, _ valuer.UUID, _ *autht
return nil, errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}
func (provider *provider) GetObjects(ctx context.Context, orgID valuer.UUID, id valuer.UUID, relation authtypes.Relation) ([]*coretypes.Object, error) {
return nil, errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}
func (provider *provider) Update(_ context.Context, _ valuer.UUID, _ *authtypes.RoleWithTransactionGroups) error {
return errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}
func (provider *provider) Patch(_ context.Context, _ valuer.UUID, _ *authtypes.Role) error {
return errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}
func (provider *provider) PatchObjects(_ context.Context, _ valuer.UUID, _ string, _ authtypes.Relation, _, _ []*coretypes.Object) error {
return errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}
func (provider *provider) Delete(_ context.Context, _ valuer.UUID, _ valuer.UUID) error {
return errors.Newf(errors.TypeUnsupported, authtypes.ErrCodeRoleUnsupported, "not implemented")
}

View File

@@ -74,46 +74,6 @@ func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, roleWithTransactionGroups)
}
func (handler *handler) GetObjects(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
id, ok := mux.Vars(r)["id"]
if !ok {
render.Error(rw, errors.New(errors.TypeInvalidInput, authtypes.ErrCodeRoleInvalidInput, "id is missing from the request"))
return
}
roleID, err := valuer.NewUUID(id)
if err != nil {
render.Error(rw, err)
return
}
relationStr, ok := mux.Vars(r)["relation"]
if !ok {
render.Error(rw, errors.New(errors.TypeInvalidInput, authtypes.ErrCodeRoleInvalidInput, "relation is missing from the request"))
return
}
relation, err := coretypes.NewVerb(relationStr)
if err != nil {
render.Error(rw, err)
return
}
objects, err := handler.authz.GetObjects(ctx, valuer.MustNewUUID(claims.OrgID), roleID, authtypes.Relation{Verb: relation})
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, coretypes.NewObjectGroupsFromObjects(objects))
}
func (handler *handler) List(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
@@ -131,99 +91,6 @@ func (handler *handler) List(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, roles)
}
func (handler *handler) Patch(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
id, err := valuer.NewUUID(mux.Vars(r)["id"])
if err != nil {
render.Error(rw, err)
return
}
req := new(authtypes.PatchableRole)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
role, err := handler.authz.Get(ctx, valuer.MustNewUUID(claims.OrgID), id)
if err != nil {
render.Error(rw, err)
return
}
err = role.PatchMetadata(req.Description)
if err != nil {
render.Error(rw, err)
return
}
err = handler.authz.Patch(ctx, valuer.MustNewUUID(claims.OrgID), role)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
func (handler *handler) PatchObjects(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
id, err := valuer.NewUUID(mux.Vars(r)["id"])
if err != nil {
render.Error(rw, err)
return
}
relation, err := coretypes.NewVerb(mux.Vars(r)["relation"])
if err != nil {
render.Error(rw, err)
return
}
role, err := handler.authz.Get(ctx, valuer.MustNewUUID(claims.OrgID), id)
if err != nil {
render.Error(rw, err)
return
}
if err := role.ErrIfManaged(); err != nil {
render.Error(rw, err)
return
}
req := new(coretypes.PatchableObjects)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
additions, deletions, err := coretypes.NewPatchableObjects(req.Additions, req.Deletions, relation)
if err != nil {
render.Error(rw, err)
return
}
err = handler.authz.PatchObjects(ctx, valuer.MustNewUUID(claims.OrgID), role.Name, authtypes.Relation{Verb: relation}, additions, deletions)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
func (handler *handler) Update(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)

View File

@@ -11,13 +11,11 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/coretypes"
"github.com/SigNoz/signoz/pkg/valuer"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/uptrace/bun"
)
var (
ErrCodeRoleInvalidInput = errors.MustNewCode("role_invalid_input")
ErrCodeRoleEmptyPatch = errors.MustNewCode("role_empty_patch")
ErrCodeInvalidTypeRelation = errors.MustNewCode("role_invalid_type_relation")
ErrCodeRoleNotFound = errors.MustNewCode("role_not_found")
ErrCodeRoleAlreadyExists = errors.MustNewCode("role_already_exists")
@@ -90,10 +88,6 @@ type UpdatableRole struct {
TransactionGroups TransactionGroups `json:"transactionGroups" required:"true" nullable:"false"`
}
type PatchableRole struct {
Description string `json:"description" required:"true"`
}
func NewRole(name, description string, roleType valuer.String, orgID valuer.UUID) *Role {
return &Role{
Identifiable: types.Identifiable{
@@ -150,17 +144,6 @@ func NewStatsFromRoles(roles []*Role) map[string]any {
return stats
}
func (role *Role) PatchMetadata(description string) error {
err := role.ErrIfManaged()
if err != nil {
return err
}
role.Description = description
role.UpdatedAt = time.Now()
return nil
}
func (role *RoleWithTransactionGroups) Update(description string, transactionGroups TransactionGroups) error {
err := role.ErrIfManaged()
if err != nil {
@@ -247,73 +230,6 @@ func (role *UpdatableRole) UnmarshalJSON(data []byte) error {
return nil
}
func (role *PatchableRole) UnmarshalJSON(data []byte) error {
type shadowPatchableRole struct {
Description string `json:"description"`
}
var shadowRole shadowPatchableRole
if err := json.Unmarshal(data, &shadowRole); err != nil {
return err
}
if shadowRole.Description == "" {
return errors.New(errors.TypeInvalidInput, ErrCodeRoleEmptyPatch, "empty role patch request received, description must be present")
}
role.Description = shadowRole.Description
return nil
}
func GetAdditionTuples(name string, orgID valuer.UUID, relation Relation, additions []*coretypes.Object) ([]*openfgav1.TupleKey, error) {
tuples := make([]*openfgav1.TupleKey, 0)
for _, object := range additions {
resource := coretypes.MustNewResourceFromTypeAndKind(object.Resource.Type, object.Resource.Kind)
transactionTuples := NewTuples(
resource,
MustNewSubject(
coretypes.NewResourceRole(),
name,
orgID,
&coretypes.VerbAssignee,
),
relation,
[]coretypes.Selector{object.Selector},
orgID,
)
tuples = append(tuples, transactionTuples...)
}
return tuples, nil
}
func GetDeletionTuples(name string, orgID valuer.UUID, relation Relation, deletions []*coretypes.Object) ([]*openfgav1.TupleKey, error) {
tuples := make([]*openfgav1.TupleKey, 0)
for _, object := range deletions {
resource := coretypes.MustNewResourceFromTypeAndKind(object.Resource.Type, object.Resource.Kind)
transactionTuples := NewTuples(
resource,
MustNewSubject(
coretypes.NewResourceRole(),
name,
orgID,
&coretypes.VerbAssignee,
),
relation,
[]coretypes.Selector{object.Selector},
orgID,
)
tuples = append(tuples, transactionTuples...)
}
return tuples, nil
}
func MustGetSigNozManagedRoleFromExistingRole(role types.Role) string {
managedRole, ok := ExistingRoleToSigNozManagedRoleMap[role]
if !ok {

100
tests/fixtures/role.py vendored
View File

@@ -1,76 +1,56 @@
"""Fixtures and helpers for role tests."""
"""Fixtures and data helpers for role tests: role lookup, request-body builder, grant comparison, and the golden managed-role matrix."""
import json
from collections.abc import Callable
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
ROLES_BASE = "/api/v1/roles"
from fixtures.fs import get_testdata_file_path
def find_role_by_name(signoz: types.SigNoz, token: str, name: str) -> str:
"""Find a role by name from the roles endpoint and return its UUID."""
resp = requests.get(
signoz.self.host_configs["8080"].get(ROLES_BASE),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.OK, resp.text
roles = resp.json()["data"]
role = next(r for r in roles if r["name"] == name)
return role["id"]
@pytest.fixture(name="find_role_id", scope="function")
def find_role_id(signoz: types.SigNoz) -> Callable[[str, str], str]:
def _find(token: str, name: str) -> str:
resp = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.OK, resp.text
return next(r["id"] for r in resp.json()["data"] if r["name"] == name)
return _find
def create_custom_role(signoz: types.SigNoz, token: str, name: str) -> str:
"""Create a custom role and return its ID. transactionGroups is required (send [] for none)."""
resp = requests.post(
signoz.self.host_configs["8080"].get(ROLES_BASE),
json={"name": name, "transactionGroups": []},
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
return resp.json()["data"]["id"]
def transaction_group(relation: str, type_name: str, kind_name: str, selectors: list[str]) -> dict:
return {"relation": relation, "objectGroup": {"resource": {"type": type_name, "kind": kind_name}, "selectors": selectors}}
def delete_custom_role(signoz: types.SigNoz, token: str, role_id: str) -> None:
"""Delete a custom role."""
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{role_id}"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def flatten_transaction_groups(groups: list[dict]) -> set[tuple[str, str, str, str]]:
flat: set[tuple[str, str, str, str]] = set()
for group in groups or []:
resource = group["objectGroup"]["resource"]
for selector in group["objectGroup"]["selectors"]:
flat.add((group["relation"], resource["type"], resource["kind"], selector))
return flat
def patch_role_objects(
signoz: types.SigNoz,
token: str,
role_id: str,
relation: str,
additions=None,
deletions=None,
) -> None:
"""PATCH /api/v1/roles/{id}/relations/{relation}/objects."""
body = {}
if additions is not None:
body["additions"] = additions
if deletions is not None:
body["deletions"] = deletions
resp = requests.patch(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{role_id}/relations/{relation}/objects"),
json=body,
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"PatchObjects {relation} failed: {resp.text}"
def load_managed_role_grants() -> dict[str, list[dict]]:
with open(get_testdata_file_path("role/managed_role_grants.json"), encoding="utf-8") as file:
raw = json.load(file)
return {name: grants for name, grants in raw.items() if not name.startswith("_")}
def object_group(type_name: str, kind_name: str, selectors: list[str]) -> dict:
"""Build an ObjectGroup dict for PatchObjects."""
return {"resource": {"type": type_name, "kind": kind_name}, "selectors": selectors}
def managed_role_names() -> set[str]:
return set(load_managed_role_grants().keys())
def expected_managed_grant_keys(role_name: str) -> set[tuple[str, str, str, str]]:
keys: set[tuple[str, str, str, str]] = set()
for grant in load_managed_role_grants()[role_name]:
for verb in grant["verbs"]:
keys.add((verb, grant["type"], grant["kind"], "*"))
return keys

View File

@@ -6,13 +6,22 @@ import requests
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.role import ROLES_BASE, find_role_by_name # noqa: F401 — re-export for existing callers
logger = setup_logger(__name__)
SERVICE_ACCOUNT_BASE = "/api/v1/service_accounts"
def find_role_by_name(signoz: types.SigNoz, token: str, name: str) -> str:
resp = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.OK, resp.text
return next(r["id"] for r in resp.json()["data"] if r["name"] == name)
def create_service_account(signoz: types.SigNoz, token: str, name: str, role: str = "signoz-viewer") -> str:
"""Create a service account, assign a role, and return its ID."""
resp = requests.post(

View File

@@ -0,0 +1,704 @@
{
"signoz-admin": [
{
"type": "role",
"kind": "role",
"verbs": [
"create",
"read",
"update",
"delete",
"list",
"attach",
"detach"
]
},
{
"type": "user",
"kind": "user",
"verbs": [
"create",
"read",
"update",
"delete",
"list",
"attach",
"detach"
]
},
{
"type": "serviceaccount",
"kind": "serviceaccount",
"verbs": [
"create",
"read",
"update",
"delete",
"list",
"attach",
"detach"
]
},
{
"type": "metaresource",
"kind": "auth-domain",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "cloud-integration",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "cloud-integration-service",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "integration",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "factor-api-key",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "factor-password",
"verbs": [
"create",
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "license",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "subscription",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "org-preference",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "public-dashboard",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "session",
"verbs": [
"read",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "dashboard",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "pipeline",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "planned-maintenance",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "rule",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "saved-view",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "trace-funnel",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "ingestion-key",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "ingestion-limit",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "notification-channel",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "route-policy",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "apdex-setting",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "metaresource",
"kind": "quick-filter",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "metaresource",
"kind": "ttl-setting",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "metaresource",
"kind": "user-preference",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "telemetryresource",
"kind": "logs",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "traces",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "metrics",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "audit-logs",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "meter-metrics",
"verbs": [
"read"
]
},
{
"type": "metaresource",
"kind": "logs-field",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "metaresource",
"kind": "traces-field",
"verbs": [
"read",
"update",
"list"
]
}
],
"signoz-editor": [
{
"type": "metaresource",
"kind": "dashboard",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "pipeline",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "planned-maintenance",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "rule",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "saved-view",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "trace-funnel",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "integration",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "ingestion-key",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "ingestion-limit",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "notification-channel",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "route-policy",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "apdex-setting",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "quick-filter",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "ttl-setting",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "user-preference",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "telemetryresource",
"kind": "logs",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "traces",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "metrics",
"verbs": [
"read"
]
},
{
"type": "metaresource",
"kind": "logs-field",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "metaresource",
"kind": "traces-field",
"verbs": [
"read",
"update",
"list"
]
}
],
"signoz-viewer": [
{
"type": "metaresource",
"kind": "dashboard",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "pipeline",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "planned-maintenance",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "rule",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "saved-view",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "trace-funnel",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "integration",
"verbs": [
"create",
"read",
"update",
"delete",
"list"
]
},
{
"type": "metaresource",
"kind": "notification-channel",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "route-policy",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "apdex-setting",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "quick-filter",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "ttl-setting",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "user-preference",
"verbs": [
"read",
"update",
"list"
]
},
{
"type": "telemetryresource",
"kind": "logs",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "traces",
"verbs": [
"read"
]
},
{
"type": "telemetryresource",
"kind": "metrics",
"verbs": [
"read"
]
},
{
"type": "metaresource",
"kind": "logs-field",
"verbs": [
"read",
"list"
]
},
{
"type": "metaresource",
"kind": "traces-field",
"verbs": [
"read",
"list"
]
}
],
"signoz-anonymous": [
{
"type": "metaresource",
"kind": "public-dashboard",
"verbs": [
"read"
]
}
]
}

View File

@@ -3,13 +3,15 @@ from http import HTTPStatus
import pytest
import requests
from sqlalchemy import sql
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.role import (
expected_managed_grant_keys,
flatten_transaction_groups,
managed_role_names,
)
from fixtures.types import Operation, SigNoz
ANONYMOUS_USER_ID = "00000000-0000-0000-0000-000000000000"
def test_managed_roles_create_on_register(
signoz: SigNoz,
@@ -18,135 +20,41 @@ def test_managed_roles_create_on_register(
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# get the list of all roles.
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
data = response.json()["data"]
# since this check happens immediately post registeration, all the managed roles should be present.
assert len(data) == 4
role_names = {role["name"] for role in data}
expected_names = {
"signoz-admin",
"signoz-viewer",
"signoz-editor",
"signoz-anonymous",
}
# do the set mapping as this is order insensitive, direct list match is order-sensitive.
assert set(role_names) == expected_names
def test_root_user_signoz_admin_assignment(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Get the user from the v2 /users/me endpoint and extract the id
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
user_data = response.json()["data"]
user_id = user_data["id"]
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
# this validates to some extent that the role assignment is complete under the assumption that middleware is functioning as expected.
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
data = response.json()["data"]
# Loop over the roles and get the org_id and id for signoz-admin role
roles = response.json()["data"]
admin_role_entry = next((role for role in roles if role["name"] == "signoz-admin"), None)
assert admin_role_entry is not None
org_id = admin_role_entry["orgId"]
# to be super sure of authorization server, let's validate the tuples in DB as well.
# todo[@vikrantgupta25]: replace this with role memebers handler once built.
with signoz.sqlstore.conn.connect() as conn:
# verify the entry present for role assignment
tuple_object_id = f"organization/{org_id}/role/signoz-admin"
tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id"),
{"object_id": tuple_object_id},
)
tuple_row = tuple_result.mappings().fetchone()
assert tuple_row is not None
# check that the tuple if for role assignment
assert tuple_row["object_type"] == "role"
assert tuple_row["relation"] == "assignee"
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/user/{user_id}"
assert tuple_row["user_object_type"] == "user"
assert tuple_row["user_object_id"] == user_object_id
else:
_user = f"user:organization/{org_id}/user/{user_id}"
assert tuple_row["user_type"] == "user"
assert tuple_row["_user"] == _user
assert len(data) == 4
assert {role["name"] for role in data} == managed_role_names()
for role in data:
assert role["type"] == "managed"
def test_anonymous_user_signoz_anonymous_assignment(
request: pytest.FixtureRequest,
@pytest.mark.parametrize("role_name", managed_role_names())
def test_managed_role_grants_match_expected(
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
role_name: str,
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_id(admin_token, role_name)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
timeout=5,
)
assert response.status_code == HTTPStatus.OK, response.text
role = response.json()["data"]
assert role["type"] == "managed"
# this validates to some extent that the role assignment is complete under the assumption that middleware is functioning as expected.
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
# Loop over the roles and get the org_id and id for signoz-admin role
roles = response.json()["data"]
admin_role_entry = next((role for role in roles if role["name"] == "signoz-anonymous"), None)
assert admin_role_entry is not None
org_id = admin_role_entry["orgId"]
# to be super sure of authorization server, let's validate the tuples in DB as well.
# todo[@vikrantgupta25]: replace this with role memebers handler once built.
with signoz.sqlstore.conn.connect() as conn:
# verify the entry present for role assignment
tuple_object_id = f"organization/{org_id}/role/signoz-anonymous"
tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id"),
{"object_id": tuple_object_id},
)
tuple_row = tuple_result.mappings().fetchone()
assert tuple_row is not None
# check that the tuple if for role assignment
assert tuple_row["object_type"] == "role"
assert tuple_row["relation"] == "assignee"
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/anonymous/{ANONYMOUS_USER_ID}"
assert tuple_row["user_object_type"] == "anonymous"
assert tuple_row["user_object_id"] == user_object_id
else:
_user = f"anonymous:organization/{org_id}/anonymous/{ANONYMOUS_USER_ID}"
assert tuple_row["user_type"] == "user"
assert tuple_row["_user"] == _user
actual = flatten_transaction_groups(role.get("transactionGroups") or [])
expected = expected_managed_grant_keys(role_name)
assert actual == expected, f"{role_name} grants mismatch:\n missing={expected - actual}\n unexpected={actual - expected}"

View File

@@ -0,0 +1,350 @@
from collections.abc import Callable
from http import HTTPStatus
import requests
from wiremock.resources.mappings import Mapping
from fixtures import types
from fixtures.auth import (
USER_ADMIN_EMAIL,
USER_ADMIN_PASSWORD,
add_license,
create_active_user,
find_user_by_email,
)
from fixtures.role import flatten_transaction_groups, transaction_group
CRUD_ROLE_NAME = "crud-test-role"
CRUD_ASSIGNEE_ROLE_NAME = "crud-assignee-role"
CRUD_ASSIGNEE_USER_EMAIL = "crud+assignee@integration.test"
CRUD_ASSIGNEE_USER_PASSWORD = "password123Z$"
def test_custom_role_create_requires_license(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "crud-no-license", "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.UNAVAILABLE_FOR_LEGAL_REASONS, f"expected 451 without license, got {resp.status_code}: {resp.text}"
def test_apply_license(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
make_http_mocks: Callable[[types.TestContainerDocker, list[Mapping]], None],
get_token: Callable[[str, str], str],
) -> None:
add_license(signoz, make_http_mocks, get_token)
def test_create_get_list_roundtrip(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
groups = [
transaction_group("read", "metaresource", "dashboard", ["*"]),
transaction_group("list", "metaresource", "dashboard", ["*"]),
transaction_group("read", "metaresource", "rule", ["*"]),
]
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": CRUD_ROLE_NAME, "description": "crud role", "transactionGroups": groups},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
role_id = resp.json()["data"]["id"]
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
role = resp.json()["data"]
assert role["name"] == CRUD_ROLE_NAME
assert role["type"] == "custom"
assert role["description"] == "crud role"
assert flatten_transaction_groups(role["transactionGroups"]) == flatten_transaction_groups(groups)
resp = requests.get(signoz.self.host_configs["8080"].get("/api/v1/roles"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
assert CRUD_ROLE_NAME in {r["name"] for r in resp.json()["data"]}
def test_declarative_update_adds_and_removes_grants(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_id(admin_token, CRUD_ROLE_NAME)
def put_grants(groups: list[dict]) -> None:
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={"description": "crud role", "transactionGroups": groups},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def current_grants() -> set:
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
return flatten_transaction_groups(resp.json()["data"]["transactionGroups"])
superset = [
transaction_group("read", "metaresource", "dashboard", ["*"]),
transaction_group("list", "metaresource", "dashboard", ["*"]),
transaction_group("create", "metaresource", "dashboard", ["*"]),
transaction_group("update", "metaresource", "dashboard", ["*"]),
transaction_group("read", "metaresource", "rule", ["*"]),
]
put_grants(superset)
assert current_grants() == flatten_transaction_groups(superset)
subset = [transaction_group("read", "metaresource", "dashboard", ["*"])]
put_grants(subset)
assert current_grants() == flatten_transaction_groups(subset)
put_grants([])
assert current_grants() == set()
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def test_update_changes_description(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
groups = [transaction_group("read", "metaresource", "dashboard", ["*"])]
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "crud-desc-role", "description": "initial", "transactionGroups": groups},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
role_id = resp.json()["data"]["id"]
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={"description": "updated", "transactionGroups": None},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"null transactionGroups: expected 400, got {resp.status_code}: {resp.text}"
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={"description": "updated", "transactionGroups": groups},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
role = resp.json()["data"]
assert role["description"] == "updated"
assert flatten_transaction_groups(role["transactionGroups"]) == flatten_transaction_groups(groups)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def test_create_validation_rejected(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
bad_bodies = {
"reserved-prefix": {"name": "signoz-nope", "transactionGroups": []},
"invalid-name-chars": {"name": "Bad_Name", "transactionGroups": []},
"name-too-long": {"name": "a" * 51, "transactionGroups": []},
"verb-invalid-for-resource": {
"name": "crud-bad-verb",
"transactionGroups": [transaction_group("assignee", "metaresource", "dashboard", ["*"])],
},
"unknown-type": {
"name": "crud-bad-type",
"transactionGroups": [transaction_group("read", "not-a-type", "dashboard", ["*"])],
},
"unknown-kind": {
"name": "crud-bad-kind",
"transactionGroups": [transaction_group("read", "metaresource", "not-a-kind", ["*"])],
},
"bad-selector-metaresource": {
"name": "crud-bad-selector",
"transactionGroups": [transaction_group("read", "metaresource", "dashboard", ["not-a-uuid"])],
},
"bad-selector-telemetry": {
"name": "crud-bad-telemetry-selector",
"transactionGroups": [transaction_group("read", "telemetryresource", "logs", ["not-a-wildcard"])],
},
}
for label, body in bad_bodies.items():
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json=body,
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"{label}: expected 400, got {resp.status_code}: {resp.text}"
def test_duplicate_name_conflict(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "crud-dup-role", "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
role_id = resp.json()["data"]["id"]
try:
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "crud-dup-role", "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CONFLICT, f"expected 409, got {resp.status_code}: {resp.text}"
finally:
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def test_managed_role_is_immutable(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
admin_role_id = find_role_id(admin_token, "signoz-admin")
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{admin_role_id}"),
json={"description": "hijacked", "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"update managed role: expected 400, got {resp.status_code}: {resp.text}"
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{admin_role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"delete managed role: expected 400, got {resp.status_code}: {resp.text}"
def test_delete_role_with_assignee_guarded(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={
"name": CRUD_ASSIGNEE_ROLE_NAME,
"transactionGroups": [transaction_group("read", "metaresource", "dashboard", ["*"])],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
role_id = resp.json()["data"]["id"]
user_id = create_active_user(
signoz,
admin_token,
email=CRUD_ASSIGNEE_USER_EMAIL,
role="VIEWER",
password=CRUD_ASSIGNEE_USER_PASSWORD,
name="crud-assignee-user",
)
resp = requests.post(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"),
json={"name": CRUD_ASSIGNEE_ROLE_NAME},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.OK, resp.text
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"delete role with assignee: expected 400, got {resp.status_code}: {resp.text}"
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
entry = next(r for r in resp.json()["data"] if r["name"] == CRUD_ASSIGNEE_ROLE_NAME)
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user_id}/roles/{entry['id']}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
def test_delete_removes_role(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "crud-del-role", "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
role_id = resp.json()["data"]["id"]
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NOT_FOUND, f"expected 404 after delete, got {resp.status_code}: {resp.text}"
def test_cleanup_assignee_user(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
user = find_user_by_email(signoz, admin_token, CRUD_ASSIGNEE_USER_EMAIL)
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text

View File

@@ -1,215 +0,0 @@
from collections.abc import Callable
from http import HTTPStatus
import pytest
import requests
from sqlalchemy import sql
from fixtures.auth import (
USER_ADMIN_EMAIL,
USER_ADMIN_PASSWORD,
USER_EDITOR_EMAIL,
USER_EDITOR_PASSWORD,
change_user_role,
)
from fixtures.types import Operation, SigNoz
def test_user_invite_accept_role_grant(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# invite a user as editor
invite_payload = {
"email": USER_EDITOR_EMAIL,
"role": "EDITOR",
}
invite_response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"),
json=invite_payload,
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert invite_response.status_code == HTTPStatus.CREATED
invited_user = invite_response.json()["data"]
reset_token = invited_user["token"]
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/resetPassword"),
json={"password": USER_EDITOR_PASSWORD, "token": reset_token},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
# Login with editor email and password
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
editor_data = response.json()["data"]
editor_id = editor_data["id"]
# check the forbidden response for admin api for editor user
admin_roles_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=2,
)
assert admin_roles_response.status_code == HTTPStatus.FORBIDDEN
roles_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert roles_response.status_code == HTTPStatus.OK
org_id = roles_response.json()["data"][0]["orgId"]
# check role assignment tuples in DB
with signoz.sqlstore.conn.connect() as conn:
tuple_object_id = f"organization/{org_id}/role/signoz-editor"
tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id"),
{"object_id": tuple_object_id},
)
tuple_row = tuple_result.mappings().fetchone()
assert tuple_row is not None
assert tuple_row["object_type"] == "role"
assert tuple_row["relation"] == "assignee"
# verify the user tuple details depending on db provider
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/user/{editor_id}"
assert tuple_row["user_object_type"] == "user"
assert tuple_row["user_object_id"] == user_object_id
else:
_user = f"user:organization/{org_id}/user/{editor_id}"
assert tuple_row["user_type"] == "user"
assert tuple_row["_user"] == _user
def test_user_update_role_grant(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
# Get the editor user's id
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
editor_data = response.json()["data"]
editor_id = editor_data["id"]
# Get the role id for viewer
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
roles_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert roles_response.status_code == HTTPStatus.OK
roles_data = roles_response.json()["data"]
org_id = roles_data[0]["orgId"]
# Update the user's role to viewer via v2 role endpoints
change_user_role(signoz, admin_token, editor_id, "signoz-editor", "signoz-viewer")
# Check that user no longer has the editor role in the db
with signoz.sqlstore.conn.connect() as conn:
editor_tuple_object_id = f"organization/{org_id}/role/signoz-editor"
viewer_tuple_object_id = f"organization/{org_id}/role/signoz-viewer"
# Check there is no tuple for signoz-editor assignment
editor_tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id AND relation = 'assignee'"),
{"object_id": editor_tuple_object_id},
)
for row in editor_tuple_result.mappings().fetchall():
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/user/{editor_id}"
assert row["user_object_id"] != user_object_id
else:
_user = f"user:organization/{org_id}/user/{editor_id}"
assert row["_user"] != _user
# Check that a tuple exists for signoz-viewer assignment
viewer_tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id AND relation = 'assignee'"),
{"object_id": viewer_tuple_object_id},
)
row = viewer_tuple_result.mappings().fetchone()
assert row is not None
assert row["object_type"] == "role"
assert row["relation"] == "assignee"
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/user/{editor_id}"
assert row["user_object_type"] == "user"
assert row["user_object_id"] == user_object_id
else:
_user = f"user:organization/{org_id}/user/{editor_id}"
assert row["user_type"] == "user"
assert row["_user"] == _user
def test_user_delete_role_revoke(
request: pytest.FixtureRequest,
signoz: SigNoz,
create_user_admin: Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
# login with editor to get the user_id and check if user exists
editor_token = get_token(USER_EDITOR_EMAIL, USER_EDITOR_PASSWORD)
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v2/users/me"),
headers={"Authorization": f"Bearer {editor_token}"},
timeout=5,
)
assert response.status_code == HTTPStatus.OK
editor_data = response.json()["data"]
editor_id = editor_data["id"]
# delete the editor user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
delete_response = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v1/user/{editor_id}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert delete_response.status_code == HTTPStatus.NO_CONTENT
# get the role id from roles list
roles_response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert roles_response.status_code == HTTPStatus.OK
org_id = roles_response.json()["data"][0]["orgId"]
tuple_object_id = f"organization/{org_id}/role/signoz-editor"
with signoz.sqlstore.conn.connect() as conn:
tuple_result = conn.execute(
sql.text("SELECT * FROM tuple WHERE object_id = :object_id AND relation = 'assignee'"),
{"object_id": tuple_object_id},
)
# there should NOT be any tuple for the current user assignment
tuple_rows = tuple_result.mappings().fetchall()
for row in tuple_rows:
if request.config.getoption("--sqlstore-provider") == "sqlite":
user_object_id = f"organization/{org_id}/user/{editor_id}"
assert row["user_object_id"] != user_object_id
else:
_user = f"user:organization/{org_id}/user/{editor_id}"
assert row["_user"] != _user

View File

@@ -1,10 +1,3 @@
"""Tests for resource-level FGA on role endpoints.
Validates that a custom role with specific role permissions gets exactly
the access it was granted — read/list allowed, create/update/delete forbidden
until explicitly granted, and revocation removes access.
"""
from collections.abc import Callable
from http import HTTPStatus
@@ -20,25 +13,13 @@ from fixtures.auth import (
create_active_user,
find_user_by_email,
)
from fixtures.role import (
ROLES_BASE,
create_custom_role,
delete_custom_role,
find_role_by_name,
object_group,
patch_role_objects,
)
from fixtures.role import transaction_group
ROLE_FGA_CUSTOM_ROLE_NAME = "role-fga-readonly"
ROLE_FGA_CUSTOM_USER_EMAIL = "customrole+rolefga@integration.test"
ROLE_FGA_CUSTOM_USER_PASSWORD = "password123Z$"
# ---------------------------------------------------------------------------
# 1. Apply license (required for custom role CRUD)
# ---------------------------------------------------------------------------
def test_apply_license(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
@@ -48,33 +29,6 @@ def test_apply_license(
add_license(signoz, make_http_mocks, get_token)
# ---------------------------------------------------------------------------
# 2. Reject role names starting with "signoz-"
# ---------------------------------------------------------------------------
def test_create_role_with_signoz_prefix_rejected(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
for name in ("signoz-custom", "signozcustom"):
resp = requests.post(
signoz.self.host_configs["8080"].get(ROLES_BASE),
json={"name": name, "transactionGroups": []},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.BAD_REQUEST, f"expected 400 for role name '{name}', got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 3. Create custom role + user with read/list on roles
# ---------------------------------------------------------------------------
def test_create_custom_role_for_role_fga(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
@@ -82,32 +36,20 @@ def test_create_custom_role_for_role_fga(
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create the custom role.
role_id = create_custom_role(signoz, admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
# Grant read on role instances.
patch_role_objects(
signoz,
admin_token,
role_id,
"read",
additions=[
object_group("role", "role", ["*"]),
],
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={
"name": ROLE_FGA_CUSTOM_ROLE_NAME,
"transactionGroups": [
transaction_group("read", "role", "role", ["*"]),
transaction_group("list", "role", "role", ["*"]),
],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
# Grant list on role collection.
patch_role_objects(
signoz,
admin_token,
role_id,
"list",
additions=[
object_group("role", "role", ["*"]),
],
)
# Create the custom-role user: invite as VIEWER, activate, change role.
user_id = create_active_user(
signoz,
admin_token,
@@ -119,125 +61,78 @@ def test_create_custom_role_for_role_fga(
change_user_role(signoz, admin_token, user_id, "signoz-viewer", ROLE_FGA_CUSTOM_ROLE_NAME)
# ---------------------------------------------------------------------------
# 3. Read-only access: allowed operations
# ---------------------------------------------------------------------------
def test_role_readonly_allowed_operations(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROLE_FGA_CUSTOM_USER_EMAIL, ROLE_FGA_CUSTOM_USER_PASSWORD)
target_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
target_role_id = find_role_id(admin_token, "signoz-viewer")
# List roles.
resp = requests.get(
signoz.self.host_configs["8080"].get(ROLES_BASE),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get("/api/v1/roles"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"list roles: {resp.text}"
# Get role.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{target_role_id}"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{target_role_id}"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"get role: {resp.text}"
# Get objects for role.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{target_role_id}/relations/read/objects"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.OK, f"get role objects: {resp.text}"
# ---------------------------------------------------------------------------
# 4. Read-only access: forbidden operations
# ---------------------------------------------------------------------------
def test_role_readonly_forbidden_operations(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(ROLE_FGA_CUSTOM_USER_EMAIL, ROLE_FGA_CUSTOM_USER_PASSWORD)
target_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
target_role_id = find_role_id(admin_token, "signoz-viewer")
# Create role — forbidden.
resp = requests.post(
signoz.self.host_configs["8080"].get(ROLES_BASE),
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "role-fga-should-fail", "transactionGroups": []},
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"create role: expected 403, got {resp.status_code}: {resp.text}"
# Patch role — forbidden.
resp = requests.patch(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{target_role_id}"),
json={"description": "role-fga-renamed"},
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{target_role_id}"),
json={"description": "role-fga-renamed", "transactionGroups": []},
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"patch role: expected 403, got {resp.status_code}: {resp.text}"
assert resp.status_code == HTTPStatus.FORBIDDEN, f"update role: expected 403, got {resp.status_code}: {resp.text}"
# Patch objects — forbidden.
resp = requests.patch(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{target_role_id}/relations/read/objects"),
json={"additions": [object_group("metaresource", "dashboard", ["*"])]},
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"patch objects: expected 403, got {resp.status_code}: {resp.text}"
# Delete role — forbidden (cannot delete managed role, but auth check comes first).
# Use the custom role itself as target (non-managed, but user lacks delete permission).
custom_role_id = find_role_by_name(signoz, admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{custom_role_id}"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
custom_role_id = find_role_id(admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{custom_role_id}"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"delete role: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 5. Grant write permissions, verify access opens up
# ---------------------------------------------------------------------------
def test_role_grant_write_permissions(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
# Grant create, update, delete on roles.
for verb in ("create", "update", "delete"):
patch_role_objects(
signoz,
admin_token,
role_id,
verb,
additions=[object_group("role", "role", ["*"])],
)
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "role", "role", ["*"]) for verb in ("read", "list", "create", "update", "delete")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(ROLE_FGA_CUSTOM_USER_EMAIL, ROLE_FGA_CUSTOM_USER_PASSWORD)
# Create role — now allowed.
resp = requests.post(
signoz.self.host_configs["8080"].get(ROLES_BASE),
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={"name": "role-fga-write-test", "transactionGroups": []},
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
@@ -245,98 +140,61 @@ def test_role_grant_write_permissions(
assert resp.status_code == HTTPStatus.CREATED, f"create role: {resp.text}"
new_role_id = resp.json()["data"]["id"]
# Patch role — now allowed.
resp = requests.patch(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{new_role_id}"),
json={"description": "role-fga-write-renamed"},
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{new_role_id}"),
json={"description": "role-fga-write-renamed", "transactionGroups": []},
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"patch role: {resp.text}"
assert resp.status_code == HTTPStatus.NO_CONTENT, f"update role: {resp.text}"
# Delete role — now allowed.
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{new_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{new_role_id}"), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"delete role: {resp.text}"
# ---------------------------------------------------------------------------
# 6. Revoke read/list → verify access lost
# ---------------------------------------------------------------------------
def test_role_revoke_read_permissions(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
target_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
role_id = find_role_id(admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
target_role_id = find_role_id(admin_token, "signoz-viewer")
# Revoke read.
patch_role_objects(
signoz,
admin_token,
role_id,
"read",
deletions=[object_group("role", "role", ["*"])],
)
# Revoke list.
patch_role_objects(
signoz,
admin_token,
role_id,
"list",
deletions=[object_group("role", "role", ["*"])],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "role", "role", ["*"]) for verb in ("create", "update", "delete")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(ROLE_FGA_CUSTOM_USER_EMAIL, ROLE_FGA_CUSTOM_USER_PASSWORD)
# List roles — forbidden.
resp = requests.get(
signoz.self.host_configs["8080"].get(ROLES_BASE),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get("/api/v1/roles"), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"list roles after revoke: expected 403, got {resp.status_code}: {resp.text}"
# Get role — forbidden.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{ROLES_BASE}/{target_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{target_role_id}"), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"get role after revoke: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 7. Clean up: delete custom role
# ---------------------------------------------------------------------------
def test_role_fga_cleanup(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, ROLE_FGA_CUSTOM_ROLE_NAME)
user = find_user_by_email(signoz, admin_token, ROLE_FGA_CUSTOM_USER_EMAIL)
# Remove the custom role from the user first.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
roles = resp.json()["data"]
custom_entry = next((r for r in roles if r["name"] == ROLE_FGA_CUSTOM_ROLE_NAME), None)
custom_entry = next((r for r in resp.json()["data"] if r["name"] == ROLE_FGA_CUSTOM_ROLE_NAME), None)
if custom_entry is not None:
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles/{custom_entry['id']}"),
@@ -345,4 +203,5 @@ def test_role_fga_cleanup(
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"remove role from user: {resp.text}"
delete_custom_role(signoz, admin_token, role_id)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text

View File

@@ -1,7 +1,7 @@
"""Tests for resource-level FGA on service account endpoints.
"""Resource-level FGA on service account endpoints.
Validates that a custom role with specific SA permissions gets exactly
the access it was granted, and that:
A custom role is granted exactly the permissions under test, and the role's full
grant set is re-declared via PUT at each step (no incremental patching). Verifies:
- SA role assignment requires BOTH serviceaccount:attach AND role:attach.
- SA role removal requires BOTH serviceaccount:detach AND role:detach.
- Factor API key creation requires factor-api-key:create AND serviceaccount:attach.
@@ -23,13 +23,7 @@ from fixtures.auth import (
create_active_user,
find_user_by_email,
)
from fixtures.role import (
create_custom_role,
delete_custom_role,
find_role_by_name,
object_group,
patch_role_objects,
)
from fixtures.role import transaction_group
from fixtures.serviceaccount import (
SERVICE_ACCOUNT_BASE,
create_service_account,
@@ -43,11 +37,6 @@ SA_FGA_CUSTOM_USER_PASSWORD = "password123Z$"
SA_FGA_TARGET_SA_NAME = "sa-fga-target"
# ---------------------------------------------------------------------------
# 1. Apply license (required for custom role CRUD)
# ---------------------------------------------------------------------------
def test_apply_license(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
@@ -57,11 +46,6 @@ def test_apply_license(
add_license(signoz, make_http_mocks, get_token)
# ---------------------------------------------------------------------------
# 2. Create custom role + user
# ---------------------------------------------------------------------------
def test_create_custom_role_readonly_sa(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
@@ -69,54 +53,17 @@ def test_create_custom_role_readonly_sa(
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create the custom role.
role_id = create_custom_role(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
# Grant read on serviceaccount instances.
patch_role_objects(
signoz,
admin_token,
role_id,
"read",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
resp = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/roles"),
json={
"name": SA_FGA_CUSTOM_ROLE_NAME,
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("read", "list")] + [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.CREATED, resp.text
# Grant list on serviceaccount (now on the serviceaccount type directly).
patch_role_objects(
signoz,
admin_token,
role_id,
"list",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
# Grant read on factor-api-key (needed for listing keys).
patch_role_objects(
signoz,
admin_token,
role_id,
"read",
additions=[
object_group("metaresource", "factor-api-key", ["*"]),
],
)
# Grant list on factor-api-key.
patch_role_objects(
signoz,
admin_token,
role_id,
"list",
additions=[
object_group("metaresource", "factor-api-key", ["*"]),
],
)
# Create the custom-role user: invite as VIEWER, activate, change role.
user_id = create_active_user(
signoz,
admin_token,
@@ -127,10 +74,8 @@ def test_create_custom_role_readonly_sa(
)
change_user_role(signoz, admin_token, user_id, "signoz-viewer", SA_FGA_CUSTOM_ROLE_NAME)
# Create a target SA (with role + key) for the custom user to operate on.
sa_id = create_service_account(signoz, admin_token, SA_FGA_TARGET_SA_NAME, role="signoz-viewer")
# Create a key on the target SA.
key_resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/keys"),
json={"name": "fga-key", "expiresAt": 0},
@@ -140,11 +85,6 @@ def test_create_custom_role_readonly_sa(
assert key_resp.status_code == HTTPStatus.CREATED, key_resp.text
# ---------------------------------------------------------------------------
# 3. Read-only access: allowed operations
# ---------------------------------------------------------------------------
def test_readonly_role_allowed_operations(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
@@ -153,56 +93,31 @@ def test_readonly_role_allowed_operations(
token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
sa_id = find_service_account_by_name(signoz, get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD), SA_FGA_TARGET_SA_NAME)["id"]
# List SAs.
resp = requests.get(
signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"list SAs: {resp.text}"
# Get SA.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"get SA: {resp.text}"
# Get SA roles.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"get SA roles: {resp.text}"
# List SA keys.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/keys"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/keys"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, f"list SA keys: {resp.text}"
# ---------------------------------------------------------------------------
# 4. Read-only access: forbidden operations
# ---------------------------------------------------------------------------
def test_readonly_role_forbidden_operations(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
key_id = get_first_key_id(signoz, admin_token, sa_id)
# Create SA — forbidden.
resp = requests.post(
signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE),
json={"name": "sa-fga-should-fail"},
@@ -211,7 +126,6 @@ def test_readonly_role_forbidden_operations(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"create SA: expected 403, got {resp.status_code}: {resp.text}"
# Update SA — forbidden.
resp = requests.put(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"),
json={"name": "sa-fga-renamed"},
@@ -220,15 +134,9 @@ def test_readonly_role_forbidden_operations(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"update SA: expected 403, got {resp.status_code}: {resp.text}"
# Delete SA — forbidden.
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"),
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"), headers={"Authorization": f"Bearer {token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"delete SA: expected 403, got {resp.status_code}: {resp.text}"
# Assign role to SA — forbidden (needs attach on both SA and role).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
json={"id": viewer_role_id},
@@ -237,7 +145,6 @@ def test_readonly_role_forbidden_operations(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"assign SA role: expected 403, got {resp.status_code}: {resp.text}"
# Remove role from SA — forbidden (needs detach on both SA and role).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles/{viewer_role_id}"),
headers={"Authorization": f"Bearer {token}"},
@@ -245,7 +152,6 @@ def test_readonly_role_forbidden_operations(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"remove SA role: expected 403, got {resp.status_code}: {resp.text}"
# Create key — forbidden (needs factor-api-key:create + serviceaccount:attach).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/keys"),
json={"name": "fga-key-fail", "expiresAt": 0},
@@ -254,7 +160,6 @@ def test_readonly_role_forbidden_operations(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"create key: expected 403, got {resp.status_code}: {resp.text}"
# Revoke key — forbidden (needs factor-api-key:delete + serviceaccount:detach).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/keys/{key_id}"),
headers={"Authorization": f"Bearer {token}"},
@@ -263,95 +168,30 @@ def test_readonly_role_forbidden_operations(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"revoke key: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 5. Grant write permissions, verify access opens up
# ---------------------------------------------------------------------------
def test_patch_role_add_write_permissions(
def test_grant_write_permissions(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
# Grant create on serviceaccount (now on serviceaccount type directly).
patch_role_objects(
signoz,
admin_token,
role_id,
"create",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
# Grant update on instances.
patch_role_objects(
signoz,
admin_token,
role_id,
"update",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
# Grant delete on instances.
patch_role_objects(
signoz,
admin_token,
role_id,
"delete",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
# Grant factor-api-key create/delete + serviceaccount attach/detach for key operations.
patch_role_objects(
signoz,
admin_token,
role_id,
"create",
additions=[
object_group("metaresource", "factor-api-key", ["*"]),
],
)
patch_role_objects(
signoz,
admin_token,
role_id,
"delete",
additions=[
object_group("metaresource", "factor-api-key", ["*"]),
],
)
patch_role_objects(
signoz,
admin_token,
role_id,
"attach",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
patch_role_objects(
signoz,
admin_token,
role_id,
"detach",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("read", "list", "create", "update", "delete", "attach", "detach")] + [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list", "create", "delete")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# Create SA — now allowed.
resp = requests.post(
signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE),
json={"name": "sa-fga-write-test"},
@@ -361,7 +201,6 @@ def test_patch_role_add_write_permissions(
assert resp.status_code == HTTPStatus.CREATED, f"create SA: {resp.text}"
new_sa_id = resp.json()["data"]["id"]
# Update SA — now allowed.
resp = requests.put(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{new_sa_id}"),
json={"name": "sa-fga-write-renamed"},
@@ -370,7 +209,6 @@ def test_patch_role_add_write_permissions(
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"update SA: {resp.text}"
# Create key — now allowed (factor-api-key:create + serviceaccount:attach).
key_resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{new_sa_id}/keys"),
json={"name": "fga-write-key", "expiresAt": 0},
@@ -380,7 +218,6 @@ def test_patch_role_add_write_permissions(
assert key_resp.status_code == HTTPStatus.CREATED, f"create key: {key_resp.text}"
new_key_id = key_resp.json()["data"]["id"]
# Revoke key — now allowed (factor-api-key:delete + serviceaccount:detach).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{new_sa_id}/keys/{new_key_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
@@ -388,15 +225,9 @@ def test_patch_role_add_write_permissions(
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"revoke key: {resp.text}"
# Delete SA — now allowed.
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{new_sa_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{new_sa_id}"), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"delete SA: {resp.text}"
# Role assignment still forbidden (has attach on SA but not on role).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
json={"id": viewer_role_id},
@@ -405,7 +236,6 @@ def test_patch_role_add_write_permissions(
)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"assign SA role: expected 403, got {resp.status_code}: {resp.text}"
# Role removal still forbidden (has detach on SA but not on role).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles/{viewer_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
@@ -414,24 +244,17 @@ def test_patch_role_add_write_permissions(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"remove SA role: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 6. Dual-attach: SA attach only (no role attach) → assign forbidden
# ---------------------------------------------------------------------------
def test_attach_with_only_sa_attach_forbidden(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
# SA attach already granted from previous test; role attach not yet granted.
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# Assign role — forbidden (has SA attach, missing role attach).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
json={"id": viewer_role_id},
@@ -441,25 +264,17 @@ def test_attach_with_only_sa_attach_forbidden(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"assign with only SA attach: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 7. Dual-detach: SA detach only (no role detach) → remove forbidden
# ---------------------------------------------------------------------------
def test_detach_with_only_sa_detach_forbidden(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
# SA detach already granted from test_patch_role_add_write_permissions;
# role detach not yet granted.
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# Remove role — forbidden (has SA detach, missing role detach).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles/{viewer_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
@@ -468,34 +283,32 @@ def test_detach_with_only_sa_detach_forbidden(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"remove with only SA detach: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 8. Dual-attach: role attach only (no SA attach) → assign forbidden
# ---------------------------------------------------------------------------
def test_attach_with_only_role_attach_forbidden(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
# Remove SA attach, grant role attach.
patch_role_objects(
signoz,
admin_token,
role_id,
"attach",
additions=[object_group("role", "role", ["*"])],
deletions=[object_group("serviceaccount", "serviceaccount", ["*"])],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("read", "list", "create", "update", "delete", "detach")]
+ [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list", "create", "delete")]
+ [transaction_group("attach", "role", "role", ["*"])],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# Assign role — forbidden (middleware SA attach check fails).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
json={"id": viewer_role_id},
@@ -505,34 +318,32 @@ def test_attach_with_only_role_attach_forbidden(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"assign with only role attach: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 9. Dual-detach: role detach only (no SA detach) → remove forbidden
# ---------------------------------------------------------------------------
def test_detach_with_only_role_detach_forbidden(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
viewer_role_id = find_role_by_name(signoz, admin_token, "signoz-viewer")
viewer_role_id = find_role_id(admin_token, "signoz-viewer")
# Remove SA detach, grant role detach.
patch_role_objects(
signoz,
admin_token,
role_id,
"detach",
additions=[object_group("role", "role", ["*"])],
deletions=[object_group("serviceaccount", "serviceaccount", ["*"])],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("read", "list", "create", "update", "delete")]
+ [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list", "create", "delete")]
+ [transaction_group(verb, "role", "role", ["*"]) for verb in ("attach", "detach")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# Remove role — forbidden (SA detach check fails).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles/{viewer_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
@@ -541,46 +352,32 @@ def test_detach_with_only_role_detach_forbidden(
assert resp.status_code == HTTPStatus.FORBIDDEN, f"remove with only role detach: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 10. Both attach + detach → assign and remove succeed
# ---------------------------------------------------------------------------
def test_attach_detach_with_both_permissions_succeeds(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
# Add back SA attach and SA detach (role attach/detach already present from previous tests).
patch_role_objects(
signoz,
admin_token,
role_id,
"attach",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
patch_role_objects(
signoz,
admin_token,
role_id,
"detach",
additions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("read", "list", "create", "update", "delete", "attach", "detach")]
+ [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list", "create", "delete")]
+ [transaction_group(verb, "role", "role", ["*"]) for verb in ("attach", "detach")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
editor_role_id = find_role_id(admin_token, "signoz-editor")
# The target SA currently has signoz-viewer assigned. Assign a different role.
editor_role_id = find_role_by_name(signoz, admin_token, "signoz-editor")
# Assign editor role — should succeed (both SA attach + role attach).
resp = requests.post(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles"),
json={"id": editor_role_id},
@@ -589,7 +386,6 @@ def test_attach_detach_with_both_permissions_succeeds(
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"assign with both attach: {resp.text}"
# Remove the editor role — should succeed (both SA detach + role detach).
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}/roles/{editor_role_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
@@ -598,84 +394,51 @@ def test_attach_detach_with_both_permissions_succeeds(
assert resp.status_code == HTTPStatus.NO_CONTENT, f"remove with both detach: {resp.text}"
# ---------------------------------------------------------------------------
# 11. Revoke read/list → verify access lost
# ---------------------------------------------------------------------------
def test_remove_read_permissions_revokes_access(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
sa_id = find_service_account_by_name(signoz, admin_token, SA_FGA_TARGET_SA_NAME)["id"]
# Revoke read.
patch_role_objects(
signoz,
admin_token,
role_id,
"read",
deletions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
)
# Revoke list (now on serviceaccount type directly).
patch_role_objects(
signoz,
admin_token,
role_id,
"list",
deletions=[
object_group("serviceaccount", "serviceaccount", ["*"]),
],
resp = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"),
json={
"description": "",
"transactionGroups": [transaction_group(verb, "serviceaccount", "serviceaccount", ["*"]) for verb in ("create", "update", "delete", "attach", "detach")]
+ [transaction_group(verb, "metaresource", "factor-api-key", ["*"]) for verb in ("read", "list", "create", "delete")]
+ [transaction_group(verb, "role", "role", ["*"]) for verb in ("attach", "detach")],
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text
custom_token = get_token(SA_FGA_CUSTOM_USER_EMAIL, SA_FGA_CUSTOM_USER_PASSWORD)
# List SAs — forbidden.
resp = requests.get(
signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(SERVICE_ACCOUNT_BASE), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"list SAs after revoke: expected 403, got {resp.status_code}: {resp.text}"
# Get SA — forbidden.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"),
headers={"Authorization": f"Bearer {custom_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"{SERVICE_ACCOUNT_BASE}/{sa_id}"), headers={"Authorization": f"Bearer {custom_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.FORBIDDEN, f"get SA after revoke: expected 403, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# 12. Clean up: delete custom role
# ---------------------------------------------------------------------------
def test_delete_custom_role_cleanup(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
find_role_id: Callable[[str, str], str],
):
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
role_id = find_role_by_name(signoz, admin_token, SA_FGA_CUSTOM_ROLE_NAME)
role_id = find_role_id(admin_token, SA_FGA_CUSTOM_ROLE_NAME)
user = find_user_by_email(signoz, admin_token, SA_FGA_CUSTOM_USER_EMAIL)
# Remove the custom role from the user first — role deletion requires no assignees.
resp = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=5,
)
resp = requests.get(signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.OK, resp.text
roles = resp.json()["data"]
custom_entry = next((r for r in roles if r["name"] == SA_FGA_CUSTOM_ROLE_NAME), None)
custom_entry = next((r for r in resp.json()["data"] if r["name"] == SA_FGA_CUSTOM_ROLE_NAME), None)
if custom_entry is not None:
resp = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v2/users/{user['id']}/roles/{custom_entry['id']}"),
@@ -684,4 +447,5 @@ def test_delete_custom_role_cleanup(
)
assert resp.status_code == HTTPStatus.NO_CONTENT, f"remove role from user: {resp.text}"
delete_custom_role(signoz, admin_token, role_id)
resp = requests.delete(signoz.self.host_configs["8080"].get(f"/api/v1/roles/{role_id}"), headers={"Authorization": f"Bearer {admin_token}"}, timeout=5)
assert resp.status_code == HTTPStatus.NO_CONTENT, resp.text