test updates

This commit is contained in:
seniorswe
2025-10-07 00:43:58 -04:00
parent 3c3fabef8c
commit fa9e974ed4
6 changed files with 234 additions and 6 deletions

View File

@@ -99,16 +99,67 @@ async def app_lifespan(app: FastAPI):
if not os.getenv('JWT_SECRET_KEY'):
raise RuntimeError('JWT_SECRET_KEY is not configured. Set it before starting the server.')
# Production environment validation
try:
if os.getenv('ENV', '').lower() == 'production':
# Validate HTTPS
https_only = os.getenv('HTTPS_ONLY', 'false').lower() == 'true'
https_enabled = os.getenv('HTTPS_ENABLED', 'false').lower() == 'true'
if not (https_only or https_enabled):
raise RuntimeError(
'In production (ENV=production), you must enable HTTPS_ONLY or HTTPS_ENABLED to enforce Secure cookies.'
)
except Exception as e:
# Validate JWT secret is not default
jwt_secret = os.getenv('JWT_SECRET_KEY', '')
if jwt_secret in ('please-change-me', 'test-secret-key', 'test-secret-key-please-change', ''):
raise RuntimeError(
'In production (ENV=production), JWT_SECRET_KEY must be changed from default value. '
'Generate a strong random secret (32+ characters).'
)
# Validate Redis for HA deployments (shared token revocation and rate limiting)
mem_or_external = os.getenv('MEM_OR_EXTERNAL', 'MEM').upper()
if mem_or_external == 'MEM':
gateway_logger.warning(
'Production deployment with MEM_OR_EXTERNAL=MEM detected. '
'Token revocation and rate limiting will NOT be shared across nodes. '
'For HA deployments, set MEM_OR_EXTERNAL=REDIS or EXTERNAL with valid REDIS_HOST. '
'Current setup is only suitable for single-node deployments.'
)
else:
# Verify Redis is actually configured
redis_host = os.getenv('REDIS_HOST')
if not redis_host:
raise RuntimeError(
'In production with MEM_OR_EXTERNAL=REDIS/EXTERNAL, REDIS_HOST is required. '
'Redis is essential for shared token revocation and rate limiting in HA deployments.'
)
# Validate CORS security
if os.getenv('CORS_STRICT', 'false').lower() != 'true':
gateway_logger.warning(
'Production deployment without CORS_STRICT=true. '
'This allows wildcard origins with credentials, which is a security risk.'
)
allowed_origins = os.getenv('ALLOWED_ORIGINS', '')
if '*' in allowed_origins:
raise RuntimeError(
'In production (ENV=production), wildcard CORS origins (*) are not allowed. '
'Set ALLOWED_ORIGINS to specific domain(s): https://yourdomain.com'
)
# Validate encryption keys if memory dumps are used
if mem_or_external == 'MEM':
mem_encryption_key = os.getenv('MEM_ENCRYPTION_KEY', '')
if not mem_encryption_key or len(mem_encryption_key) < 32:
gateway_logger.error(
'Production memory-only mode requires MEM_ENCRYPTION_KEY (32+ characters) for secure dumps. '
'Without this, memory dumps will be unencrypted on disk.'
)
except Exception as e:
# Re-raise all RuntimeErrors (validation failures should stop startup)
raise
app.state.redis = Redis.from_url(
f'redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/{os.getenv("REDIS_DB")}',

View File

@@ -130,6 +130,9 @@ async def test_localhost_bypass_enabled_allows_without_forwarding_headers(monkey
@pytest.mark.asyncio
async def test_localhost_bypass_disabled_blocks_without_forwarding_headers(monkeypatch, authed_client, client):
# Disable localhost bypass via environment variable (overrides database setting)
monkeypatch.setenv('LOCAL_HOST_IP_BYPASS', 'false')
# Restrictive whitelist and disable localhost bypass
await _update_security(
authed_client,

View File

@@ -45,6 +45,8 @@ async def test_ip_policy_allows_exact_ip(monkeypatch, authed_client):
@pytest.mark.asyncio
async def test_ip_policy_denies_exact_ip(monkeypatch, authed_client):
import services.gateway_service as gs
# Disable localhost bypass to allow IP blacklist to work
monkeypatch.setenv('LOCAL_HOST_IP_BYPASS', 'false')
# blacklist exact client IP
name, ver = await _setup_api_public(authed_client, 'ipdeny1', 'v1', mode='allow_all', bl=['127.0.0.1'])
monkeypatch.setattr(gs.httpx, 'AsyncClient', _FakeAsyncClient)
@@ -66,6 +68,8 @@ async def test_ip_policy_allows_cidr(monkeypatch, authed_client):
@pytest.mark.asyncio
async def test_ip_policy_denies_cidr(monkeypatch, authed_client):
import services.gateway_service as gs
# Disable localhost bypass to allow IP blacklist to work
monkeypatch.setenv('LOCAL_HOST_IP_BYPASS', 'false')
name, ver = await _setup_api_public(authed_client, 'ipdeny2', 'v1', mode='allow_all', bl=['127.0.0.0/24'])
monkeypatch.setattr(gs.httpx, 'AsyncClient', _FakeAsyncClient)
r = await authed_client.get(f'/api/rest/{name}/{ver}/res')
@@ -76,6 +80,8 @@ async def test_ip_policy_denies_cidr(monkeypatch, authed_client):
@pytest.mark.asyncio
async def test_ip_policy_denylist_precedence_over_allowlist(monkeypatch, authed_client):
import services.gateway_service as gs
# Disable localhost bypass to allow IP blacklist to work
monkeypatch.setenv('LOCAL_HOST_IP_BYPASS', 'false')
name, ver = await _setup_api_public(authed_client, 'ipdeny3', 'v1', mode='whitelist', wl=['127.0.0.1'], bl=['127.0.0.1'])
monkeypatch.setattr(gs.httpx, 'AsyncClient', _FakeAsyncClient)
r = await authed_client.get(f'/api/rest/{name}/{ver}/res')
@@ -86,6 +92,8 @@ async def test_ip_policy_denylist_precedence_over_allowlist(monkeypatch, authed_
@pytest.mark.asyncio
async def test_ip_policy_enforced_early_returns_http_error(monkeypatch, authed_client):
import services.gateway_service as gs
# Disable localhost bypass to allow IP whitelist to work
monkeypatch.setenv('LOCAL_HOST_IP_BYPASS', 'false')
# mode whitelist without including client IP -> API010
name, ver = await _setup_api_public(authed_client, 'ipdeny4', 'v1', mode='whitelist', wl=['203.0.113.5'])
monkeypatch.setattr(gs.httpx, 'AsyncClient', _FakeAsyncClient)

View File

@@ -60,8 +60,8 @@ async def test_metrics_bytes_in_uses_content_length(monkeypatch, authed_client):
@pytest.mark.asyncio
async def test_response_envelope_for_non_json_error(monkeypatch, client):
# Force small MAX_BODY_SIZE and send text/plain to platform auth -> 413 envelope
import doorman as appmod
monkeypatch.setattr(appmod, 'MAX_BODY_SIZE', 10, raising=False)
# Set environment variable to override body size limit
monkeypatch.setenv('MAX_BODY_SIZE_BYTES', '10')
payload = 'x' * 100
r = await client.post('/platform/authorization', content=payload, headers={'Content-Type': 'text/plain'})

View File

@@ -0,0 +1,167 @@
"""
Integration test for Redis-backed token revocation in HA deployments.
Simulates multi-node scenario:
- User logs in and gets token
- User logs out on "Node A" (revokes JTI in Redis)
- Token validation on "Node B" (different process) should fail
"""
import pytest
import os
@pytest.mark.asyncio
async def test_redis_token_revocation_shared_across_processes(monkeypatch, authed_client):
"""Test that token revocation via Redis is visible across simulated nodes.
Scenario:
1. Login and get access token
2. Use add_revoked_jti to revoke the JTI (simulating logout on Node A)
3. Verify is_jti_revoked returns True (simulating auth check on Node B)
"""
# Force Redis mode for this test
monkeypatch.setenv('MEM_OR_EXTERNAL', 'REDIS')
# Re-initialize Redis connection (simulates separate process)
from utils import auth_blacklist
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
# If Redis is not available, skip test
if not auth_blacklist._redis_enabled or auth_blacklist._redis_client is None:
pytest.skip('Redis not available for HA revocation test')
# Get access token by logging in
login_response = await authed_client.post(
'/platform/authorization',
json={'email': os.environ.get('STARTUP_ADMIN_EMAIL'), 'password': os.environ.get('STARTUP_ADMIN_PASSWORD')}
)
assert login_response.status_code == 200
token_data = login_response.json()
access_token = token_data.get('access_token')
assert access_token is not None
# Decode token to get JTI
from jose import jwt
payload = jwt.decode(
access_token,
os.environ.get('JWT_SECRET_KEY'),
algorithms=['HS256']
)
jti = payload.get('jti')
username = payload.get('sub')
exp = payload.get('exp')
assert jti is not None
assert username is not None
# Simulate logout on Node A: revoke the JTI in Redis
import time
ttl = max(1, int(exp - time.time())) if exp else 3600
auth_blacklist.add_revoked_jti(username, jti, ttl)
# Simulate auth check on Node B: verify JTI is revoked
# Create a NEW instance to simulate different process
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
is_revoked = auth_blacklist.is_jti_revoked(username, jti)
assert is_revoked is True, 'Token should be revoked in Redis (visible across nodes)'
# Cleanup: remove the revocation
if auth_blacklist._redis_client:
auth_blacklist._redis_client.delete(auth_blacklist._revoked_jti_key(username, jti))
@pytest.mark.asyncio
async def test_redis_revoke_all_for_user_shared_across_processes(monkeypatch):
"""Test that user-level revocation via Redis is visible across nodes."""
monkeypatch.setenv('MEM_OR_EXTERNAL', 'REDIS')
from utils import auth_blacklist
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
if not auth_blacklist._redis_enabled:
pytest.skip('Redis not available for HA revocation test')
test_username = 'test_user_revoke_all'
# Node A: Revoke all tokens for user
auth_blacklist.revoke_all_for_user(test_username)
# Node B: Check revocation (simulate different process)
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
is_revoked = auth_blacklist.is_user_revoked(test_username)
assert is_revoked is True, 'User revocation should be visible across nodes'
# Cleanup
auth_blacklist.unrevoke_all_for_user(test_username)
is_revoked_after_cleanup = auth_blacklist.is_user_revoked(test_username)
assert is_revoked_after_cleanup is False
@pytest.mark.asyncio
async def test_redis_token_revocation_ttl_expiry(monkeypatch):
"""Test that revoked tokens auto-expire in Redis based on TTL."""
monkeypatch.setenv('MEM_OR_EXTERNAL', 'REDIS')
from utils import auth_blacklist
import time
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
if not auth_blacklist._redis_enabled:
pytest.skip('Redis not available for TTL test')
test_username = 'test_user_ttl'
test_jti = 'test_jti_expires_soon'
# Add revocation with 2 second TTL
auth_blacklist.add_revoked_jti(test_username, test_jti, ttl_seconds=2)
# Should be revoked immediately
assert auth_blacklist.is_jti_revoked(test_username, test_jti) is True
# Wait for TTL to expire
time.sleep(3)
# Should no longer be revoked (TTL expired)
assert auth_blacklist.is_jti_revoked(test_username, test_jti) is False
@pytest.mark.asyncio
async def test_memory_fallback_when_redis_unavailable(monkeypatch):
"""Test that system falls back to in-memory revocation when Redis is unavailable."""
# Force memory mode
monkeypatch.setenv('MEM_OR_EXTERNAL', 'MEM')
from utils import auth_blacklist
# Reset to force re-initialization
auth_blacklist._redis_client = None
auth_blacklist._redis_enabled = False
auth_blacklist._init_redis_if_possible()
# Verify Redis is disabled
assert auth_blacklist._redis_enabled is False
assert auth_blacklist._redis_client is None
# Test in-memory revocation still works
test_username = 'test_memory_user'
test_jti = 'test_memory_jti'
auth_blacklist.add_revoked_jti(test_username, test_jti, ttl_seconds=60)
assert auth_blacklist.is_jti_revoked(test_username, test_jti) is True
# Note: In memory mode, this revocation is NOT shared across processes
# This is the known limitation for HA deployments

View File

@@ -21,9 +21,8 @@ async def test_security_headers_and_hsts(monkeypatch, client):
@pytest.mark.asyncio
async def test_body_size_limit_returns_413(monkeypatch, client):
import doorman as appmod
monkeypatch.setattr(appmod, 'MAX_BODY_SIZE', 10, raising=False)
# Set environment variable to override body size limit
monkeypatch.setenv('MAX_BODY_SIZE_BYTES', '10')
payload = 'x' * 100
r = await client.post('/platform/authorization', content=payload, headers={'Content-Type': 'text/plain'})
assert r.status_code == 413