Files
doorman/backend-services/utils/ip_policy_util.py
T

166 lines
5.7 KiB
Python

from __future__ import annotations
import os
from fastapi import HTTPException, Request
from utils.audit_util import audit
from utils.security_settings_util import get_cached_settings
def _get_client_ip(request: Request, trust_xff: bool) -> str | None:
"""Determine client IP with optional proxy trust.
When `trust_xff` is True, this prefers headers supplied by trusted proxies.
Trusted proxies are matched against `xff_trusted_proxies` (IPs/CIDRs) in
security settings. If that list is empty, any proxy is implicitly trusted
to preserve backwards-compatibility.
"""
try:
settings = get_cached_settings()
trusted = settings.get('xff_trusted_proxies') or []
src_ip = request.client.host if request.client else None
if isinstance(src_ip, str) and src_ip in ('testserver', 'localhost'):
src_ip = '127.0.0.1'
def _from_trusted_proxy() -> bool:
if not trusted:
# Empty list means trust all proxies for backwards-compatibility
return True
return _ip_in_list(src_ip, trusted) if src_ip else False
if trust_xff and _from_trusted_proxy():
for header in (
'x-forwarded-for',
'X-Forwarded-For',
'x-real-ip',
'X-Real-IP',
'cf-connecting-ip',
'CF-Connecting-IP',
):
val = request.headers.get(header)
if val:
ip = val.split(',')[0].strip()
if ip:
return ip
return src_ip
except Exception:
return request.client.host if request.client else None
def _ip_in_list(ip: str, patterns: list[str]) -> bool:
try:
import ipaddress
ip_obj = ipaddress.ip_address(ip)
for pat in patterns or []:
p = (pat or '').strip()
if not p:
continue
try:
if '/' in p:
net = ipaddress.ip_network(p, strict=False)
if ip_obj in net:
return True
else:
if ip_obj == ipaddress.ip_address(p):
return True
except Exception:
continue
return False
except Exception:
return False
def _is_loopback(ip: str | None) -> bool:
try:
if not ip:
return False
if ip in ('testserver', 'localhost'):
return True
import ipaddress
return ipaddress.ip_address(ip).is_loopback
except Exception:
return False
def enforce_api_ip_policy(request: Request, api: dict):
"""
Enforce per-API IP policy.
- api_ip_mode: 'allow_all' (default) or 'whitelist'
- api_ip_whitelist: applied when mode=='whitelist'
- api_ip_blacklist: always applied (deny)
- api_trust_x_forwarded_for: override; otherwise use platform trust_x_forwarded_for
"""
try:
settings = get_cached_settings()
trust_xff = (
bool(api.get('api_trust_x_forwarded_for'))
if api.get('api_trust_x_forwarded_for') is not None
else bool(settings.get('trust_x_forwarded_for'))
)
client_ip = _get_client_ip(request, trust_xff)
if not client_ip:
return
try:
settings = get_cached_settings()
env_flag = os.getenv('LOCAL_HOST_IP_BYPASS')
allow_local = (
(env_flag.lower() == 'true')
if isinstance(env_flag, str) and env_flag.strip() != ''
else bool(settings.get('allow_localhost_bypass'))
)
direct_ip = getattr(getattr(request, 'client', None), 'host', None)
has_forward = any(
request.headers.get(h)
for h in (
'x-forwarded-for',
'X-Forwarded-For',
'x-real-ip',
'X-Real-IP',
'cf-connecting-ip',
'CF-Connecting-IP',
'forwarded',
'Forwarded',
)
)
if allow_local and direct_ip and _is_loopback(direct_ip) and not has_forward:
return
except Exception:
pass
mode = (api.get('api_ip_mode') or 'allow_all').strip().lower()
wl = api.get('api_ip_whitelist') or []
bl = api.get('api_ip_blacklist') or []
if bl and _ip_in_list(client_ip, bl):
try:
audit(
request,
actor=None,
action='ip.api_deny',
target=str(api.get('api_id') or api.get('api_name') or 'unknown_api'),
status='blocked',
details={'reason': 'blacklisted', 'effective_ip': client_ip},
)
except Exception:
pass
raise HTTPException(status_code=403, detail='API011')
if mode == 'whitelist':
if not wl or not _ip_in_list(client_ip, wl):
try:
audit(
request,
actor=None,
action='ip.api_deny',
target=str(api.get('api_id') or api.get('api_name') or 'unknown_api'),
status='blocked',
details={'reason': 'not_in_whitelist', 'effective_ip': client_ip},
)
except Exception:
pass
raise HTTPException(status_code=403, detail='API010')
except HTTPException:
raise
except Exception:
return