Files
doorman/backend-services/utils/role_util.py
2025-12-10 23:09:05 -05:00

147 lines
5.2 KiB
Python

"""
The contents of this file are property of Doorman Dev, LLC
Review the Apache License 2.0 for valid authorization of use
See https://github.com/pypeople-dev/doorman for more information
"""
import logging
from fastapi import HTTPException
from utils.database import role_collection, user_collection
from utils.doorman_cache_util import doorman_cache
logger = logging.getLogger('doorman.gateway')
def _strip_id(r):
try:
if r and r.get('_id'):
del r['_id']
except Exception:
pass
return r
async def is_admin_role(role_name: str) -> bool:
"""Return True if the given role is the admin role.
Considers role.platform_admin flag for backward compatibility and
treats names 'admin' and legacy 'platform admin' as admin.
"""
try:
role = doorman_cache.get_cache('role_cache', role_name)
if not role:
role = role_collection.find_one({'role_name': role_name})
role = _strip_id(role)
if role:
doorman_cache.set_cache('role_cache', role_name, role)
if not role:
rn = (role_name or '').strip().lower()
return rn in ('admin', 'platform admin')
if role.get('platform_admin') is True:
return True
rn = (role.get('role_name') or '').strip().lower()
return rn in ('admin', 'platform admin')
except Exception:
return False
async def is_admin_user(username: str) -> bool:
"""Return True if the user has the admin role."""
try:
user = doorman_cache.get_cache('user_cache', username)
if not user:
user = user_collection.find_one({'username': username})
user = _strip_id(user)
if user:
doorman_cache.set_cache('user_cache', username, user)
if not user:
return False
return await is_admin_role(user.get('role'))
except Exception:
return False
async def is_platform_admin_role(role_name: str) -> bool:
return await is_admin_role(role_name)
async def is_platform_admin_user(username: str) -> bool:
return await is_admin_user(username)
async def validate_platform_role(role_name, action):
"""
Get the platform roles from the cache or database.
"""
try:
role = doorman_cache.get_cache('role_cache', role_name)
if not role:
role = role_collection.find_one({'role_name': role_name})
if not role:
raise HTTPException(status_code=404, detail='Role not found')
if role.get('_id'):
del role['_id']
doorman_cache.set_cache('role_cache', role_name, role)
if action == 'manage_users' and role.get('manage_users'):
return True
elif action == 'manage_apis' and role.get('manage_apis'):
return True
elif action == 'manage_endpoints' and role.get('manage_endpoints'):
return True
elif action == 'manage_groups' and role.get('manage_groups'):
return True
elif action == 'manage_roles' and role.get('manage_roles'):
return True
elif action == 'manage_routings' and role.get('manage_routings'):
return True
elif action == 'manage_gateway' and role.get('manage_gateway'):
return True
elif action == 'manage_subscriptions' and role.get('manage_subscriptions'):
return True
elif action == 'manage_security' and role.get('manage_security'):
return True
elif action == 'manage_tiers' and role.get('manage_tiers'):
return True
elif action == 'manage_rate_limits' and role.get('manage_rate_limits'):
return True
elif action == 'manage_credits' and role.get('manage_credits'):
return True
elif action == 'manage_auth' and role.get('manage_auth'):
return True
elif action == 'view_logs' and role.get('view_logs'):
return True
elif action == 'export_logs' and role.get('export_logs'):
return True
elif action == 'view_analytics' and role.get('view_analytics'):
return True
return False
except Exception as e:
logger.error(f'validate_platform_role error: {e}')
raise HTTPException(status_code=500, detail='Internal Server Error')
async def platform_role_required_bool(username, action):
try:
user = doorman_cache.get_cache('user_cache', username)
if not user:
user = user_collection.find_one({'username': username})
if not user:
raise HTTPException(status_code=404, detail='User not found')
if user.get('_id'):
del user['_id']
if user.get('password'):
del user['password']
doorman_cache.set_cache('user_cache', username, user)
if not user:
raise HTTPException(status_code=404, detail='User not found')
if not await validate_platform_role(user.get('role'), action):
raise HTTPException(status_code=403, detail='You do not have the correct role for this')
return True
except HTTPException:
return False
except Exception as e:
logger.error(f'Unexpected error: {e}')
return False