mirror of
https://github.com/apidoorman/doorman.git
synced 2026-02-08 01:58:59 -06:00
2440 lines
96 KiB
Python
Executable File
2440 lines
96 KiB
Python
Executable File
"""
|
|
The contents of this file are property of Doorman Dev, LLC
|
|
Review the Apache License 2.0 for valid authorization of use
|
|
See https://github.com/apidoorman/doorman for more information
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import timedelta
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
|
|
import uvicorn
|
|
from dotenv import load_dotenv, find_dotenv
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.exceptions import RequestValidationError
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.middleware.gzip import GZipMiddleware
|
|
from jose import JWTError
|
|
from pydantic import BaseSettings
|
|
from redis.asyncio import Redis
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
|
|
try:
|
|
if sys.version_info >= (3, 13):
|
|
try:
|
|
from importlib.metadata import PackageNotFoundError, version
|
|
except Exception:
|
|
version = None
|
|
PackageNotFoundError = Exception
|
|
if version is not None:
|
|
try:
|
|
v = version('aiohttp')
|
|
parts = [
|
|
int(p)
|
|
for p in (v.split('.')[:3] + ['0', '0'])[:3]
|
|
if p.isdigit() or p.isnumeric()
|
|
]
|
|
while len(parts) < 3:
|
|
parts.append(0)
|
|
if tuple(parts) < (3, 10, 10):
|
|
raise SystemExit(
|
|
f'Incompatible aiohttp {v} detected on Python {sys.version.split()[0]}. '
|
|
'Please upgrade to aiohttp>=3.10.10 (pip install -U aiohttp) or run with Python 3.11.'
|
|
)
|
|
except PackageNotFoundError:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
from models.response_model import ResponseModel
|
|
from routes.analytics_routes import analytics_router
|
|
from middleware.analytics_middleware import setup_analytics_middleware
|
|
from utils.analytics_scheduler import analytics_scheduler
|
|
from routes.api_routes import api_router
|
|
from routes.authorization_routes import authorization_router
|
|
from routes.config_hot_reload_routes import config_hot_reload_router
|
|
from routes.config_routes import config_router
|
|
from routes.credit_routes import credit_router
|
|
from routes.dashboard_routes import dashboard_router
|
|
from routes.demo_routes import demo_router
|
|
from routes.endpoint_routes import endpoint_router
|
|
from routes.gateway_routes import gateway_router
|
|
from routes.group_routes import group_router
|
|
from routes.logging_routes import logging_router
|
|
from routes.memory_routes import memory_router
|
|
from routes.metrics_routes import metrics_router
|
|
from routes.monitor_routes import monitor_router
|
|
from routes.proto_routes import proto_router
|
|
from routes.quota_routes import quota_router
|
|
from routes.rate_limit_rule_routes import rate_limit_rule_router
|
|
from routes.role_routes import role_router
|
|
from routes.routing_routes import routing_router
|
|
from routes.security_routes import security_router
|
|
from routes.subscription_routes import subscription_router
|
|
from routes.tier_routes import tier_router
|
|
from routes.tools_routes import tools_router
|
|
from routes.user_routes import user_router
|
|
from routes.vault_routes import vault_router
|
|
from routes.openapi_routes import openapi_router
|
|
from routes.wsdl_routes import wsdl_router
|
|
from routes.graphql_routes import graphql_routes_router
|
|
from routes.grpc_routes import grpc_router
|
|
from utils.audit_util import audit
|
|
from middleware.security_audit_middleware import SecurityAuditMiddleware
|
|
from middleware.logging_middleware import GlobalLoggingMiddleware
|
|
from middleware.latency_injection_middleware import LatencyInjectionMiddleware
|
|
from middleware.websocket_reject_middleware import WebSocketRejectMiddleware
|
|
from utils.auth_blacklist import purge_expired_tokens
|
|
from utils.cache_manager_util import cache_manager
|
|
from utils.database import database
|
|
from utils.hot_reload_config import hot_config
|
|
from utils.ip_policy_util import _get_client_ip as _policy_get_client_ip
|
|
from utils.ip_policy_util import _ip_in_list as _policy_ip_in_list
|
|
from utils.ip_policy_util import _is_loopback as _policy_is_loopback
|
|
from utils.memory_dump_util import (
|
|
dump_memory_to_file,
|
|
find_latest_dump_path,
|
|
restore_memory_from_file,
|
|
)
|
|
from utils.metrics_util import metrics_store
|
|
from utils.enhanced_metrics_util import enhanced_metrics_store
|
|
from utils.response_util import process_response
|
|
from utils.security_settings_util import (
|
|
get_cached_settings,
|
|
load_settings,
|
|
start_auto_save_task,
|
|
stop_auto_save_task,
|
|
)
|
|
|
|
# Avoid loading developer .env while running under pytest so tests fully
|
|
# control environment via monkeypatch without hidden defaults.
|
|
try:
|
|
import sys as _sys
|
|
|
|
_running_under_pytest = (
|
|
'PYTEST_CURRENT_TEST' in os.environ or 'pytest' in _sys.modules
|
|
)
|
|
except Exception:
|
|
_running_under_pytest = False
|
|
if not _running_under_pytest:
|
|
load_dotenv(find_dotenv(usecwd=True))
|
|
|
|
PID_FILE = 'doorman.pid'
|
|
|
|
|
|
def _migrate_generated_directory() -> None:
|
|
"""Migrate legacy root-level 'generated/' into backend-services/generated.
|
|
|
|
Older defaults wrote files to a CWD-relative 'generated/' which could be at
|
|
the repo root. Normalize by moving files into backend-services/generated.
|
|
"""
|
|
try:
|
|
be_root = Path(__file__).resolve().parent
|
|
project_root = be_root.parent
|
|
src = project_root / 'generated'
|
|
dst = be_root / 'generated'
|
|
if src == dst:
|
|
return
|
|
if not src.exists() or not src.is_dir():
|
|
dst.mkdir(exist_ok=True)
|
|
gateway_logger.info(f'Generated dir: {dst} (no migration needed)')
|
|
return
|
|
dst.mkdir(parents=True, exist_ok=True)
|
|
moved_count = 0
|
|
for path in src.rglob('*'):
|
|
if path.is_dir():
|
|
continue
|
|
rel = path.relative_to(src)
|
|
dest_file = dst / rel
|
|
dest_file.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
shutil.move(str(path), str(dest_file))
|
|
except Exception:
|
|
try:
|
|
shutil.copy2(str(path), str(dest_file))
|
|
path.unlink(missing_ok=True)
|
|
except Exception:
|
|
continue
|
|
moved_count += 1
|
|
try:
|
|
shutil.rmtree(src)
|
|
except Exception:
|
|
pass
|
|
gateway_logger.info(f'Generated dir migrated: {moved_count} file(s) moved to {dst}')
|
|
except Exception as e:
|
|
try:
|
|
gateway_logger.warning(f'Generated dir migration skipped: {e}')
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def validate_database_connections():
|
|
"""Validate database connections on startup with retry logic"""
|
|
gateway_logger.info('Validating database connections...')
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
from utils.database import user_collection
|
|
|
|
await user_collection.find_one({})
|
|
gateway_logger.info('✓ MongoDB connection verified')
|
|
break
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
wait = 2**attempt
|
|
gateway_logger.warning(
|
|
f'MongoDB connection attempt {attempt + 1}/{max_retries} failed: {e}'
|
|
)
|
|
gateway_logger.info(f'Retrying in {wait} seconds...')
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
gateway_logger.error(f'MongoDB connection failed after {max_retries} attempts')
|
|
raise RuntimeError(f'Cannot connect to MongoDB: {e}') from e
|
|
|
|
redis_host = os.getenv('REDIS_HOST')
|
|
mem_or_external = os.getenv('MEM_OR_EXTERNAL', 'MEM')
|
|
|
|
if redis_host and mem_or_external == 'REDIS':
|
|
for attempt in range(max_retries):
|
|
try:
|
|
import redis.asyncio as redis
|
|
|
|
redis_url = f'redis://{redis_host}:{os.getenv("REDIS_PORT", "6379")}'
|
|
if os.getenv('REDIS_PASSWORD'):
|
|
redis_url = f'redis://:{os.getenv("REDIS_PASSWORD")}@{redis_host}:{os.getenv("REDIS_PORT", "6379")}'
|
|
|
|
r = redis.from_url(redis_url)
|
|
await r.ping()
|
|
await r.close()
|
|
gateway_logger.info('✓ Redis connection verified')
|
|
break
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
wait = 2**attempt
|
|
gateway_logger.warning(
|
|
f'Redis connection attempt {attempt + 1}/{max_retries} failed: {e}'
|
|
)
|
|
gateway_logger.info(f'Retrying in {wait} seconds...')
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
gateway_logger.error(f'Redis connection failed after {max_retries} attempts')
|
|
raise RuntimeError(f'Cannot connect to Redis: {e}') from e
|
|
|
|
gateway_logger.info('All database connections validated successfully')
|
|
|
|
|
|
def validate_token_revocation_config() -> None:
|
|
"""
|
|
Validate token revocation is safe for multi-worker deployments.
|
|
"""
|
|
threads = int(os.getenv('THREADS', '1'))
|
|
mem_mode = os.getenv('MEM_OR_EXTERNAL', 'MEM')
|
|
if threads > 1 and mem_mode == 'MEM':
|
|
gateway_logger.error(
|
|
'CRITICAL: Multi-worker mode (THREADS > 1) with in-memory storage '
|
|
'does not provide consistent token revocation across workers. '
|
|
f'Current config: THREADS={threads}, MEM_OR_EXTERNAL={mem_mode}'
|
|
)
|
|
gateway_logger.error(
|
|
'Token revocation requires Redis in multi-worker mode. '
|
|
'Either set MEM_OR_EXTERNAL=REDIS or set THREADS=1'
|
|
)
|
|
raise RuntimeError(
|
|
'Token revocation requires Redis in multi-worker mode (THREADS > 1). '
|
|
'Set MEM_OR_EXTERNAL=REDIS or THREADS=1'
|
|
)
|
|
gateway_logger.info(f'Token revocation mode: {mem_mode} with {threads} worker(s)')
|
|
|
|
|
|
@asynccontextmanager
|
|
async def app_lifespan(app: FastAPI):
|
|
if os.getenv('MEM_OR_EXTERNAL', '') != 'MEM':
|
|
await validate_database_connections()
|
|
validate_token_revocation_config()
|
|
admin_password = os.getenv('DOORMAN_ADMIN_PASSWORD', '')
|
|
if len(admin_password) < 12:
|
|
raise RuntimeError(
|
|
'DOORMAN_ADMIN_PASSWORD must be at least 12 characters. '
|
|
'Generate strong password: openssl rand -base64 16'
|
|
)
|
|
|
|
if not os.getenv('JWT_SECRET_KEY'):
|
|
raise RuntimeError('JWT_SECRET_KEY is not configured. Set it before starting the server.')
|
|
|
|
try:
|
|
if os.getenv('ENV', '').lower() == 'production':
|
|
https_only = os.getenv('HTTPS_ONLY', 'false').lower() == 'true'
|
|
if not https_only:
|
|
raise RuntimeError(
|
|
'In production (ENV=production), you must enable HTTPS_ONLY to enforce Secure cookies and CSRF validation. '
|
|
'TLS should be terminated at reverse proxy (Nginx, Traefik, ALB, etc.).'
|
|
)
|
|
|
|
jwt_secret = os.getenv('JWT_SECRET_KEY', '')
|
|
if jwt_secret in (
|
|
'please-change-me',
|
|
'test-secret-key',
|
|
'test-secret-key-please-change',
|
|
'',
|
|
):
|
|
raise RuntimeError(
|
|
'In production (ENV=production), JWT_SECRET_KEY must be changed from default value. '
|
|
'Generate a strong random secret (32+ characters).'
|
|
)
|
|
|
|
mem_or_external = os.getenv('MEM_OR_EXTERNAL', 'MEM').upper()
|
|
if mem_or_external == 'MEM':
|
|
num_threads = int(os.getenv('THREADS', 1))
|
|
if num_threads > 1:
|
|
raise RuntimeError(
|
|
'In production with THREADS > 1, MEM_OR_EXTERNAL=MEM is unsafe. '
|
|
'Rate limiting and token revocation are not shared across workers. '
|
|
'Set MEM_OR_EXTERNAL=REDIS with REDIS_HOST configured.'
|
|
)
|
|
gateway_logger.warning(
|
|
'Production deployment with MEM_OR_EXTERNAL=MEM detected. '
|
|
'Single-node only. For multi-node HA, use REDIS or EXTERNAL mode.'
|
|
)
|
|
else:
|
|
redis_host = os.getenv('REDIS_HOST')
|
|
if not redis_host:
|
|
raise RuntimeError(
|
|
'In production with MEM_OR_EXTERNAL=REDIS/EXTERNAL, REDIS_HOST is required. '
|
|
'Redis is essential for shared token revocation and rate limiting in HA deployments.'
|
|
)
|
|
|
|
# Platform CORS removed - CORS is enforced at the API level
|
|
|
|
token_encryption_key = os.getenv('TOKEN_ENCRYPTION_KEY', '')
|
|
if not token_encryption_key or len(token_encryption_key) < 32:
|
|
gateway_logger.warning(
|
|
'Production deployment without TOKEN_ENCRYPTION_KEY (32+ characters). '
|
|
'API keys will not be encrypted at rest. Highly recommended for production security.'
|
|
)
|
|
|
|
if mem_or_external == 'MEM':
|
|
mem_encryption_key = os.getenv('MEM_ENCRYPTION_KEY', '')
|
|
if not mem_encryption_key or len(mem_encryption_key) < 32:
|
|
raise RuntimeError(
|
|
'In production (ENV=production) with MEM_OR_EXTERNAL=MEM, MEM_ENCRYPTION_KEY is required (32+ characters). '
|
|
'Memory dumps contain sensitive data and must be encrypted. '
|
|
'Generate a strong random key: openssl rand -hex 32'
|
|
)
|
|
except Exception:
|
|
raise
|
|
|
|
mem_or_external = os.getenv('MEM_OR_EXTERNAL', 'MEM').upper()
|
|
redis_host = os.getenv('REDIS_HOST')
|
|
redis_port = os.getenv('REDIS_PORT')
|
|
redis_db = os.getenv('REDIS_DB')
|
|
redis_password = os.getenv('REDIS_PASSWORD', '')
|
|
|
|
if mem_or_external in ('REDIS', 'EXTERNAL'):
|
|
if not redis_password:
|
|
gateway_logger.warning(
|
|
'Redis password not set; connection may be unauthenticated. '
|
|
'Set REDIS_PASSWORD environment variable to secure Redis access.'
|
|
)
|
|
host = redis_host or 'localhost'
|
|
port = redis_port or '6379'
|
|
db = redis_db or '0'
|
|
if redis_password:
|
|
redis_url = f'redis://:{redis_password}@{host}:{port}/{db}'
|
|
else:
|
|
redis_url = f'redis://{host}:{port}/{db}'
|
|
app.state.redis = Redis.from_url(redis_url, decode_responses=True)
|
|
else:
|
|
app.state.redis = None
|
|
|
|
app.state._purger_task = asyncio.create_task(automatic_purger(1800))
|
|
|
|
METRICS_FILE = os.path.join(LOGS_DIR, 'metrics.json')
|
|
try:
|
|
metrics_store.load_from_file(METRICS_FILE)
|
|
except Exception as e:
|
|
gateway_logger.debug(f'Metrics restore skipped: {e}')
|
|
|
|
ENHANCED_METRICS_FILE = os.path.join(LOGS_DIR, 'enhanced_metrics.json')
|
|
try:
|
|
enhanced_metrics_store.load_from_file(ENHANCED_METRICS_FILE)
|
|
except Exception as e:
|
|
gateway_logger.debug(f'Enhanced metrics restore skipped: {e}')
|
|
|
|
async def _metrics_autosave(interval_s: int = 60):
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(interval_s)
|
|
metrics_store.save_to_file(METRICS_FILE)
|
|
enhanced_metrics_store.save_to_file(ENHANCED_METRICS_FILE)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
app.state._metrics_save_task = asyncio.create_task(_metrics_autosave(60))
|
|
except Exception:
|
|
app.state._metrics_save_task = None
|
|
|
|
try:
|
|
await load_settings()
|
|
await start_auto_save_task()
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to initialize security settings auto-save: {e}')
|
|
|
|
try:
|
|
settings = get_cached_settings()
|
|
if bool(settings.get('trust_x_forwarded_for')) and not (
|
|
settings.get('xff_trusted_proxies') or []
|
|
):
|
|
gateway_logger.warning(
|
|
'Security: trust_x_forwarded_for enabled but xff_trusted_proxies is empty; header spoofing risk. Configure trusted proxy IPs/CIDRs.'
|
|
)
|
|
|
|
if os.getenv('ENV', '').lower() == 'production':
|
|
raise RuntimeError(
|
|
'Production deployment with trust_x_forwarded_for requires xff_trusted_proxies '
|
|
'to prevent IP spoofing. Configure trusted proxy IPs/CIDRs via /platform/security endpoint.'
|
|
)
|
|
except Exception as e:
|
|
if isinstance(e, RuntimeError):
|
|
raise
|
|
gateway_logger.debug(f'Startup security checks skipped: {e}')
|
|
|
|
try:
|
|
app.openapi()
|
|
problems = []
|
|
for route in app.routes:
|
|
path = getattr(route, 'path', '')
|
|
if not path.startswith(('/platform', '/api')):
|
|
continue
|
|
include = getattr(route, 'include_in_schema', True)
|
|
methods = set(getattr(route, 'methods', set()) or [])
|
|
if not include or 'OPTIONS' in methods:
|
|
continue
|
|
if not getattr(route, 'description', None):
|
|
problems.append(f'Route {path} missing description')
|
|
if not getattr(route, 'response_model', None):
|
|
problems.append(f'Route {path} missing response_model')
|
|
if problems:
|
|
gateway_logger.info('OpenAPI lint: \n' + '\n'.join(problems[:50]))
|
|
except Exception as e:
|
|
gateway_logger.debug(f'OpenAPI lint skipped: {e}')
|
|
|
|
# Start analytics background scheduler (aggregation, persistence)
|
|
try:
|
|
await analytics_scheduler.start()
|
|
app.state._analytics_scheduler_started = True
|
|
except Exception as e:
|
|
logging.getLogger('doorman.analytics').warning(
|
|
f'Analytics scheduler start failed: {e}'
|
|
)
|
|
|
|
try:
|
|
if database.memory_only:
|
|
settings = get_cached_settings()
|
|
hint = settings.get('dump_path')
|
|
latest_path = find_latest_dump_path(hint)
|
|
if latest_path and os.path.exists(latest_path):
|
|
info = restore_memory_from_file(latest_path)
|
|
gateway_logger.info(
|
|
f'Memory mode: restored from dump {latest_path} (created_at={info.get("created_at")})'
|
|
)
|
|
else:
|
|
gateway_logger.info('Memory mode: no existing dump found to restore')
|
|
except Exception as e:
|
|
gateway_logger.error(f'Memory mode restore failed: {e}')
|
|
|
|
# Optional: in-process demo seeding for MEM mode
|
|
try:
|
|
if database.memory_only and str(os.getenv('DEMO_SEED', 'false')).lower() in (
|
|
'1', 'true', 'yes', 'on'
|
|
):
|
|
from utils.demo_seed_util import run_seed as _run_seed_demo
|
|
|
|
def _int_env(name: str, default: int) -> int:
|
|
try:
|
|
return int(os.getenv(name, default))
|
|
except Exception:
|
|
return default
|
|
|
|
users = _int_env('DEMO_USERS', 40)
|
|
apis = _int_env('DEMO_APIS', 15)
|
|
endpoints = _int_env('DEMO_ENDPOINTS', 6)
|
|
groups = _int_env('DEMO_GROUPS', 8)
|
|
protos = _int_env('DEMO_PROTOS', 6)
|
|
logs = _int_env('DEMO_LOGS', 1500)
|
|
gateway_logger.info(
|
|
f'DEMO_SEED enabled. Seeding in-memory store (users={users}, apis={apis}, endpoints={endpoints}, groups={groups}, protos={protos}, logs={logs})'
|
|
)
|
|
try:
|
|
await asyncio.to_thread(
|
|
_run_seed_demo,
|
|
users,
|
|
apis,
|
|
endpoints,
|
|
groups,
|
|
protos,
|
|
logs,
|
|
None,
|
|
)
|
|
except Exception as _se:
|
|
gateway_logger.warning(f'DEMO_SEED failed: {_se}')
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
async def _perform_dump(reason: str):
|
|
"""Write a memory dump if in memory-only mode and key is configured.
|
|
|
|
Uses a simple guard to avoid duplicate dumps when multiple signals fire.
|
|
"""
|
|
try:
|
|
if getattr(app.state, '_mem_dumping', False):
|
|
return
|
|
app.state._mem_dumping = True
|
|
if not database.memory_only:
|
|
gateway_logger.info(f'{reason}: ignored (not in memory-only mode)')
|
|
return
|
|
if not os.getenv('MEM_ENCRYPTION_KEY'):
|
|
gateway_logger.error(f'{reason}: MEM_ENCRYPTION_KEY not configured; dump skipped')
|
|
return
|
|
settings = get_cached_settings()
|
|
path_hint = settings.get('dump_path')
|
|
dump_path = await asyncio.to_thread(dump_memory_to_file, path_hint)
|
|
gateway_logger.info(f'{reason}: memory dump written to {dump_path}')
|
|
except Exception as e:
|
|
gateway_logger.error(f'{reason}: memory dump failed: {e}')
|
|
finally:
|
|
try:
|
|
app.state._mem_dumping = False
|
|
except Exception:
|
|
pass
|
|
|
|
if hasattr(signal, 'SIGUSR1'):
|
|
loop.add_signal_handler(
|
|
signal.SIGUSR1, lambda: asyncio.create_task(_perform_dump('SIGUSR1'))
|
|
)
|
|
gateway_logger.info('SIGUSR1 handler registered for on-demand memory dumps')
|
|
|
|
# Also try to dump on SIGTERM/SIGINT, then forward the signal so Uvicorn can shutdown
|
|
def _register_forwarding_signal(sig: signal.Signals, reason: str):
|
|
async def _handler():
|
|
# Catch BaseException to avoid noisy "Task exception was never retrieved"
|
|
try:
|
|
try:
|
|
await _perform_dump(reason)
|
|
finally:
|
|
try:
|
|
# Remove our handler to avoid recursion, then re-send
|
|
loop.remove_signal_handler(sig)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.kill(os.getpid(), sig)
|
|
except BaseException:
|
|
# Suppress KeyboardInterrupt/Cancel noise in this task
|
|
pass
|
|
except BaseException:
|
|
# Ensure no unhandled BaseException bubbles from the task
|
|
pass
|
|
|
|
loop.add_signal_handler(sig, lambda: asyncio.create_task(_handler()))
|
|
|
|
# Let Uvicorn handle SIGINT/SIGTERM for clean shutdown; we write a final
|
|
# dump in the lifespan 'finally' block. Keep only SIGUSR1 for ad-hoc dumps.
|
|
except (NotImplementedError, RuntimeError, AttributeError):
|
|
# Signal handlers may be unsupported on some platforms or when no running loop
|
|
pass
|
|
|
|
try:
|
|
if hasattr(signal, 'SIGHUP'):
|
|
loop = asyncio.get_event_loop()
|
|
|
|
async def _sighup_reload():
|
|
try:
|
|
gateway_logger.info('SIGHUP received: reloading configuration...')
|
|
|
|
hot_config.reload()
|
|
|
|
log_level = hot_config.get('LOG_LEVEL', 'INFO')
|
|
try:
|
|
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
|
|
logging.getLogger('doorman.gateway').setLevel(numeric_level)
|
|
gateway_logger.info(f'Log level updated to {log_level}')
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to update log level: {e}')
|
|
|
|
gateway_logger.info('Configuration reload complete')
|
|
except Exception as e:
|
|
gateway_logger.error(f'SIGHUP reload failed: {e}', exc_info=True)
|
|
|
|
loop.add_signal_handler(signal.SIGHUP, lambda: asyncio.create_task(_sighup_reload()))
|
|
gateway_logger.info('SIGHUP handler registered for configuration hot reload')
|
|
except (NotImplementedError, AttributeError):
|
|
gateway_logger.debug('SIGHUP not supported on this platform')
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
gateway_logger.info('Starting graceful shutdown...')
|
|
app.state.shutting_down = True
|
|
# Immediate memory dump on shutdown initiation (Ctrl+C/SIGTERM)
|
|
immediate_dump_ok = False
|
|
try:
|
|
if database.memory_only:
|
|
settings = get_cached_settings()
|
|
path = settings.get('dump_path')
|
|
await asyncio.to_thread(dump_memory_to_file, path)
|
|
gateway_logger.info(f'Immediate memory dump written to {path}')
|
|
immediate_dump_ok = True
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to write immediate memory dump: {e}')
|
|
gateway_logger.info('Waiting for in-flight requests to complete (5s grace period)...')
|
|
await asyncio.sleep(5)
|
|
try:
|
|
await stop_auto_save_task()
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to stop auto-save task: {e}')
|
|
# Optionally write a final dump if immediate failed
|
|
if not immediate_dump_ok:
|
|
try:
|
|
if database.memory_only:
|
|
settings = get_cached_settings()
|
|
path = settings.get('dump_path')
|
|
await asyncio.to_thread(dump_memory_to_file, path)
|
|
gateway_logger.info(f'Final memory dump written to {path}')
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to write final memory dump: {e}')
|
|
try:
|
|
# Stop analytics scheduler if started
|
|
if getattr(app.state, '_analytics_scheduler_started', False):
|
|
try:
|
|
await analytics_scheduler.stop()
|
|
except Exception:
|
|
pass
|
|
task = getattr(app.state, '_purger_task', None)
|
|
if task:
|
|
task.cancel()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
gateway_logger.info('Closing database connections...')
|
|
from utils.database import close_database_connections
|
|
|
|
close_database_connections()
|
|
except Exception as e:
|
|
gateway_logger.error(f'Error closing database connections: {e}')
|
|
try:
|
|
gateway_logger.info('Closing HTTP clients...')
|
|
from services.gateway_service import GatewayService
|
|
|
|
if hasattr(GatewayService, '_http_client') and GatewayService._http_client:
|
|
await GatewayService._http_client.aclose()
|
|
gateway_logger.info('HTTP client closed')
|
|
except Exception as e:
|
|
gateway_logger.error(f'Error closing HTTP client: {e}')
|
|
|
|
try:
|
|
METRICS_FILE = os.path.join(LOGS_DIR, 'metrics.json')
|
|
metrics_store.save_to_file(METRICS_FILE)
|
|
ENHANCED_METRICS_FILE = os.path.join(LOGS_DIR, 'enhanced_metrics.json')
|
|
enhanced_metrics_store.save_to_file(ENHANCED_METRICS_FILE)
|
|
except Exception:
|
|
pass
|
|
|
|
gateway_logger.info('Graceful shutdown complete')
|
|
try:
|
|
t = getattr(app.state, '_metrics_save_task', None)
|
|
if t:
|
|
t.cancel()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
from services.gateway_service import GatewayService as _GS
|
|
|
|
if os.getenv('ENABLE_HTTPX_CLIENT_CACHE', 'true').lower() != 'false':
|
|
try:
|
|
import asyncio as _asyncio
|
|
|
|
if _asyncio.iscoroutinefunction(_GS.aclose_http_client):
|
|
await _GS.aclose_http_client()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _generate_unique_id(route: dict) -> str:
|
|
try:
|
|
name = getattr(route, 'name', 'op') or 'op'
|
|
path = getattr(route, 'path', '').replace('/', '_').replace('{', '').replace('}', '')
|
|
methods = '_'.join(sorted(list(getattr(route, 'methods', []) or [])))
|
|
return f'{name}_{methods}_{path}'.lower()
|
|
except Exception:
|
|
return (getattr(route, 'name', 'op') or 'op').lower()
|
|
|
|
|
|
# Custom Swagger UI CSS for light/dark mode
|
|
SWAGGER_CUSTOM_CSS = r"""
|
|
/* Improve readability for dark mode (high contrast, clear borders) */
|
|
:root { color-scheme: light dark; }
|
|
|
|
/* Shared dark-mode rules as a mixin via a class selector */
|
|
.swagger-dark {
|
|
/* Background */
|
|
background-color: #0d1117 !important;
|
|
|
|
/* Force high-contrast text by default */
|
|
/* Ensures any hardcoded dark grays (e.g., #3b4151) become white */
|
|
}
|
|
.swagger-dark .swagger-ui,
|
|
.swagger-dark .swagger-ui * {
|
|
color: #ffffff !important;
|
|
}
|
|
|
|
.swagger-dark .swagger-ui p,
|
|
.swagger-dark .swagger-ui div,
|
|
.swagger-dark .swagger-ui span,
|
|
.swagger-dark .swagger-ui a,
|
|
.swagger-dark .swagger-ui li,
|
|
.swagger-dark .swagger-ui h1,
|
|
.swagger-dark .swagger-ui h2,
|
|
.swagger-dark .swagger-ui h3,
|
|
.swagger-dark .swagger-ui h4,
|
|
.swagger-dark .swagger-ui h5,
|
|
.swagger-dark .swagger-ui h6 { color: #ffffff !important; }
|
|
|
|
.swagger-dark .swagger-ui .renderedMarkdown,
|
|
.swagger-dark .swagger-ui .renderedMarkdown *,
|
|
.swagger-dark .swagger-ui .markdown,
|
|
.swagger-dark .swagger-ui .markdown *,
|
|
.swagger-dark .swagger-ui .info,
|
|
.swagger-dark .swagger-ui .info *,
|
|
.swagger-dark .swagger-ui .opblock-summary-path,
|
|
.swagger-dark .swagger-ui .opblock-summary-description,
|
|
.swagger-dark .swagger-ui .opblock-summary-method,
|
|
.swagger-dark .swagger-ui .model,
|
|
.swagger-dark .swagger-ui .model *,
|
|
.swagger-dark .swagger-ui .parameters,
|
|
.swagger-dark .swagger-ui .parameters *,
|
|
.swagger-dark .swagger-ui .responses-wrapper,
|
|
.swagger-dark .swagger-ui .responses-wrapper *,
|
|
.swagger-dark .swagger-ui .response-col_description,
|
|
.swagger-dark .swagger-ui .response-col_description *,
|
|
.swagger-dark .swagger-ui .response-col_status { color: #ffffff !important; }
|
|
|
|
/* Icons and svgs */
|
|
.swagger-dark .swagger-ui svg,
|
|
.swagger-dark .swagger-ui svg * { fill: #e5e7eb !important; stroke: #e5e7eb !important; }
|
|
|
|
/* Top bar */
|
|
.swagger-dark .swagger-ui .topbar { background-color: #0d1117 !important; border-bottom: 1px solid #1f2937 !important; }
|
|
.swagger-dark .swagger-ui .topbar .download-url-wrapper .select-label select { background:#0b1220 !important; color:#e6edf3 !important; border-color:#334155 !important; }
|
|
|
|
/* General panels */
|
|
.swagger-dark .swagger-ui .scheme-container,
|
|
.swagger-dark .swagger-ui .information-container,
|
|
.swagger-dark .swagger-ui .opblock,
|
|
.swagger-dark .swagger-ui .model,
|
|
.swagger-dark .swagger-ui .model-box,
|
|
.swagger-dark .swagger-ui .parameters,
|
|
.swagger-dark .swagger-ui .responses-wrapper,
|
|
.swagger-dark .swagger-ui .response-col_description__inner,
|
|
.swagger-dark .swagger-ui .table-container {
|
|
background: #0f172a !important;
|
|
border-color: #1f2937 !important;
|
|
color: #ffffff !important;
|
|
}
|
|
|
|
/* Section headers */
|
|
.swagger-dark .swagger-ui .opblock .opblock-section-header,
|
|
.swagger-dark .swagger-ui .opblock-summary {
|
|
background: #0b1220 !important;
|
|
border-color: #1f2937 !important;
|
|
color: #ffffff !important;
|
|
}
|
|
.swagger-dark .swagger-ui .opblock-tag { color:#ffffff !important; }
|
|
|
|
/* Inputs and selects */
|
|
.swagger-dark .swagger-ui select,
|
|
.swagger-dark .swagger-ui input,
|
|
.swagger-dark .swagger-ui textarea {
|
|
background:#0b1220 !important;
|
|
color:#ffffff !important;
|
|
border: 1px solid #334155 !important;
|
|
}
|
|
.swagger-dark .swagger-ui ::placeholder { color:#94a3b8 !important; opacity:1; }
|
|
|
|
/* Tables */
|
|
.swagger-dark .swagger-ui table thead tr th { color:#ffffff !important; background:#0b1220 !important; border-color:#1f2937 !important; }
|
|
.swagger-dark .swagger-ui table tbody tr { background:#0f172a !important; border-color:#1f2937 !important; }
|
|
.swagger-dark .swagger-ui table tbody tr td { color:#ffffff !important; }
|
|
.swagger-dark .swagger-ui table tbody tr:nth-child(2n) { background:#111827 !important; }
|
|
|
|
/* Code blocks */
|
|
.swagger-dark .swagger-ui .markdown,
|
|
.swagger-dark .swagger-ui .markdown p,
|
|
.swagger-dark .swagger-ui .markdown li,
|
|
.swagger-dark .swagger-ui .model-title,
|
|
.swagger-dark .swagger-ui .parameter__name,
|
|
.swagger-dark .swagger-ui .parameter__type,
|
|
.swagger-dark .swagger-ui .parameter__in,
|
|
.swagger-dark .swagger-ui .response-col_status,
|
|
.swagger-dark .swagger-ui .opblock-summary-description,
|
|
.swagger-dark .swagger-ui .model .property,
|
|
.swagger-dark .swagger-ui .markdown code,
|
|
.swagger-dark .swagger-ui code,
|
|
.swagger-dark .swagger-ui pre
|
|
{
|
|
color:#ffffff !important;
|
|
background:#0b1220 !important;
|
|
border: 1px solid #1f2937 !important;
|
|
}
|
|
|
|
/* Links and actions */
|
|
.swagger-dark .swagger-ui .info a,
|
|
.swagger-dark .swagger-ui a { color:#bfdbfe !important; }
|
|
.swagger-dark .swagger-ui a:hover { color:#dbeafe !important; text-decoration: underline !important; }
|
|
|
|
/* Buttons */
|
|
.swagger-dark .swagger-ui .btn,
|
|
.swagger-dark .swagger-ui .btn.execute {
|
|
background:#2563eb !important;
|
|
color:#ffffff !important;
|
|
border-color: transparent !important;
|
|
}
|
|
.swagger-dark .swagger-ui .authorization__btn,
|
|
.swagger-dark .swagger-ui .try-out { color:#e6edf3 !important; border-color:#334155 !important; }
|
|
|
|
/* Copy icon and misc */
|
|
.swagger-dark .swagger-ui .copy-to-clipboard { filter: invert(1) hue-rotate(180deg); }
|
|
|
|
/* Apply in OS dark mode */
|
|
@media (prefers-color-scheme: dark) {
|
|
body { background-color: #0d1117 !important; }
|
|
body, html { color: #ffffff !important; }
|
|
body { /* namespace rules under a helper class to avoid leaking */ }
|
|
body:where(*) { }
|
|
/* Scope Swagger with a helper class at runtime */
|
|
body:where(*) .swagger-ui { }
|
|
/* Use the shared rule-set by attaching the helper class to body */
|
|
body.swagger-dark & {}
|
|
}
|
|
|
|
/* Apply when a parent sets .dark (e.g., app-level theme toggle) */
|
|
/* We attach the shared rule-set via .dark on html/body. */
|
|
html.dark, body.dark { background-color: #0d1117 !important; }
|
|
html.dark .swagger-ui,
|
|
body.dark .swagger-ui { }
|
|
|
|
/* Minimal glue: add the helper class automatically so rules above apply */
|
|
/* This ensures the strong overrides are scoped to Swagger only. */
|
|
/* The helper class is added via the default Swagger container id (#swagger-ui). */
|
|
#swagger-ui { color: inherit; }
|
|
body { }
|
|
/* Attach the helper class to body for specificity without JS */
|
|
/* We rely on the cascade: the selectors above start with .swagger-dark. */
|
|
/* Mirror them by qualifying #swagger-ui's ancestor chain. */
|
|
body:has(#swagger-ui) { }
|
|
|
|
/* Finally, map body with #swagger-ui present to the swager-dark scope. */
|
|
body:has(#swagger-ui) { }
|
|
/* Since not all browsers support :has in our target, also repeat a direct scoped class: */
|
|
/* The Swagger template includes <div id="swagger-ui">; we can scope via descendant selectors. */
|
|
/* Use the class on body via attribute selector to maximize compat without JS. */
|
|
/* As a pragmatic approach, duplicate the shared rules under media + .dark below. */
|
|
|
|
/* Duplicate rules for OS dark mode without relying on :has */
|
|
@media (prefers-color-scheme: dark) {
|
|
/* Wrap Swagger rules under an ancestor with the id to limit scope */
|
|
#swagger-ui { background-color: #0d1117 !important; }
|
|
#swagger-ui .swagger-ui,
|
|
#swagger-ui .swagger-ui * { color:#ffffff !important; }
|
|
#swagger-ui .swagger-ui svg, #swagger-ui .swagger-ui svg * { fill:#e5e7eb !important; stroke:#e5e7eb !important; }
|
|
#swagger-ui .swagger-ui .topbar { background-color:#0d1117 !important; border-bottom:1px solid #1f2937 !important; }
|
|
#swagger-ui .swagger-ui .topbar .download-url-wrapper .select-label select { background:#0b1220 !important; color:#e6edf3 !important; border-color:#334155 !important; }
|
|
#swagger-ui .swagger-ui .scheme-container,
|
|
#swagger-ui .swagger-ui .information-container,
|
|
#swagger-ui .swagger-ui .opblock,
|
|
#swagger-ui .swagger-ui .model,
|
|
#swagger-ui .swagger-ui .model-box,
|
|
#swagger-ui .swagger-ui .parameters,
|
|
#swagger-ui .swagger-ui .responses-wrapper,
|
|
#swagger-ui .swagger-ui .response-col_description__inner,
|
|
#swagger-ui .swagger-ui .table-container { background:#0f172a !important; border-color:#1f2937 !important; color:#ffffff !important; }
|
|
#swagger-ui .swagger-ui .opblock .opblock-section-header,
|
|
#swagger-ui .swagger-ui .opblock-summary { background:#0b1220 !important; border-color:#1f2937 !important; color:#ffffff !important; }
|
|
#swagger-ui .swagger-ui .opblock-tag { color:#ffffff !important; }
|
|
#swagger-ui .swagger-ui select,
|
|
#swagger-ui .swagger-ui input,
|
|
#swagger-ui .swagger-ui textarea { background:#0b1220 !important; color:#ffffff !important; border:1px solid #334155 !important; }
|
|
#swagger-ui .swagger-ui ::placeholder { color:#94a3b8 !important; opacity:1; }
|
|
#swagger-ui .swagger-ui table thead tr th { color:#ffffff !important; background:#0b1220 !important; border-color:#1f2937 !important; }
|
|
#swagger-ui .swagger-ui table tbody tr { background:#0f172a !important; border-color:#1f2937 !important; }
|
|
#swagger-ui .swagger-ui table tbody tr td { color:#ffffff !important; }
|
|
#swagger-ui .swagger-ui table tbody tr:nth-child(2n) { background:#111827 !important; }
|
|
#swagger-ui .swagger-ui .markdown,
|
|
#swagger-ui .swagger-ui .markdown p,
|
|
#swagger-ui .swagger-ui .markdown li,
|
|
#swagger-ui .swagger-ui .model-title,
|
|
#swagger-ui .swagger-ui .parameter__name,
|
|
#swagger-ui .swagger-ui .parameter__type,
|
|
#swagger-ui .swagger-ui .parameter__in,
|
|
#swagger-ui .swagger-ui .response-col_status,
|
|
#swagger-ui .swagger-ui .opblock-summary-description,
|
|
#swagger-ui .swagger-ui .model .property,
|
|
#swagger-ui .swagger-ui .markdown code,
|
|
#swagger-ui .swagger-ui code,
|
|
#swagger-ui .swagger-ui pre { color:#ffffff !important; background:#0b1220 !important; border:1px solid #1f2937 !important; }
|
|
#swagger-ui .swagger-ui .info a,
|
|
#swagger-ui .swagger-ui a { color:#bfdbfe !important; }
|
|
#swagger-ui .swagger-ui a:hover { color:#dbeafe !important; text-decoration: underline !important; }
|
|
#swagger-ui .swagger-ui .btn,
|
|
#swagger-ui .swagger-ui .btn.execute { background:#2563eb !important; color:#ffffff !important; border-color: transparent !important; }
|
|
#swagger-ui .swagger-ui .authorization__btn,
|
|
#swagger-ui .swagger-ui .try-out { color:#e6edf3 !important; border-color:#334155 !important; }
|
|
#swagger-ui .swagger-ui .copy-to-clipboard { filter: invert(1) hue-rotate(180deg); }
|
|
}
|
|
|
|
/* Duplicate rules for app-level .dark class */
|
|
html.dark #swagger-ui,
|
|
body.dark #swagger-ui { background-color:#0d1117 !important; }
|
|
html.dark #swagger-ui .swagger-ui,
|
|
body.dark #swagger-ui .swagger-ui,
|
|
html.dark #swagger-ui .swagger-ui *,
|
|
body.dark #swagger-ui .swagger-ui * { color:#ffffff !important; }
|
|
"""
|
|
|
|
doorman = FastAPI(
|
|
title='doorman',
|
|
description="A lightweight API gateway for AI, REST, SOAP, GraphQL, gRPC, and WebSocket APIs — fully managed with built-in RESTful APIs for configuration and control. This is your application's gateway to the world.",
|
|
version='1.0.0',
|
|
lifespan=app_lifespan,
|
|
generate_unique_id_function=_generate_unique_id,
|
|
docs_url='/platform/docs',
|
|
redoc_url='/platform/redoc',
|
|
openapi_url='/platform/openapi.json',
|
|
swagger_ui_parameters={
|
|
'docExpansion': 'none',
|
|
'defaultModelsExpandDepth': -1,
|
|
'displayRequestDuration': True,
|
|
'customCss': SWAGGER_CUSTOM_CSS,
|
|
},
|
|
)
|
|
|
|
# Reject websocket connections unless explicitly enabled
|
|
WEBSOCKETS_ENABLED = os.getenv('WEBSOCKETS_ENABLED', 'false').lower() in ('1', 'true', 'yes', 'on')
|
|
doorman.add_middleware(WebSocketRejectMiddleware, enabled=WEBSOCKETS_ENABLED)
|
|
|
|
# Middleware to handle X-Forwarded-Proto for correct HTTPS redirects behind reverse proxy
|
|
@doorman.middleware('http')
|
|
async def forwarded_proto_middleware(request: Request, call_next):
|
|
"""Handle X-Forwarded-Proto header to fix FastAPI redirects behind reverse proxy.
|
|
|
|
When behind Nginx/reverse proxy, FastAPI sees requests as HTTP even when they're HTTPS.
|
|
This causes automatic trailing slash redirects to use http:// instead of https://.
|
|
This middleware updates the request scope to use the forwarded protocol.
|
|
"""
|
|
forwarded_proto = request.headers.get('x-forwarded-proto')
|
|
if forwarded_proto:
|
|
request.scope['scheme'] = forwarded_proto
|
|
return await call_next(request)
|
|
|
|
# Enable analytics collection middleware for request/response metrics
|
|
try:
|
|
setup_analytics_middleware(doorman)
|
|
except Exception as e:
|
|
logging.getLogger('doorman.analytics').warning(
|
|
f'Failed to enable analytics middleware: {e}'
|
|
)
|
|
|
|
# Add CORS middleware
|
|
# Starlette CORS middleware is disabled by default because platform and per-API
|
|
# CORS are enforced explicitly below. Enable only if requested via env.
|
|
if os.getenv('ENABLE_STARLETTE_CORS', 'false').lower() in ('1', 'true', 'yes', 'on'):
|
|
doorman.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=['*'], # In production, replace with specific origins
|
|
allow_credentials=True,
|
|
allow_methods=['*'],
|
|
allow_headers=['*'], # This will include X-Requested-With
|
|
expose_headers=[],
|
|
max_age=600,
|
|
)
|
|
|
|
https_only = os.getenv('HTTPS_ONLY', 'false').lower() == 'true'
|
|
domain = os.getenv('COOKIE_DOMAIN', 'localhost')
|
|
|
|
# - API gateway routes (/api/*): CORS controlled per-API in gateway routes/services
|
|
|
|
|
|
def _platform_cors_config() -> dict:
|
|
"""Compute platform CORS config from environment.
|
|
|
|
Env vars:
|
|
- ALLOWED_ORIGINS: CSV list or '*'
|
|
- ALLOW_METHODS: CSV list (defaults to common methods)
|
|
- ALLOW_HEADERS: CSV list or '*'
|
|
- ALLOW_CREDENTIALS: true/false
|
|
- CORS_STRICT: true/false (when true, do not echo wildcard origins with credentials)
|
|
"""
|
|
import os as _os
|
|
|
|
strict = _os.getenv('CORS_STRICT', 'false').lower() in ('1', 'true', 'yes', 'on')
|
|
allowed_origins = [
|
|
o.strip() for o in (_os.getenv('ALLOWED_ORIGINS') or '').split(',') if o.strip()
|
|
] or ['*']
|
|
allow_methods = [
|
|
m.strip()
|
|
for m in (_os.getenv('ALLOW_METHODS') or 'GET,POST,PUT,DELETE,OPTIONS,PATCH,HEAD').split(
|
|
','
|
|
)
|
|
if m.strip()
|
|
]
|
|
allow_headers_env = _os.getenv('ALLOW_HEADERS') or ''
|
|
if allow_headers_env.strip() == '*':
|
|
# Default to a known, minimal safe list when wildcard requested
|
|
# Include X-Requested-With for common browser/XHR compat
|
|
allow_headers = [
|
|
'Accept',
|
|
'Content-Type',
|
|
'X-CSRF-Token',
|
|
'Authorization',
|
|
'X-Requested-With',
|
|
]
|
|
else:
|
|
# When not wildcard, allow a sensible default set (can be overridden by env)
|
|
allow_headers = [h.strip() for h in allow_headers_env.split(',') if h.strip()] or [
|
|
'Accept',
|
|
'Content-Type',
|
|
'X-CSRF-Token',
|
|
'Authorization',
|
|
'X-Requested-With',
|
|
]
|
|
# Default to allowing credentials in dev to reduce setup friction; can be
|
|
# tightened via ALLOW_CREDENTIALS=false for stricter environments.
|
|
allow_credentials = _os.getenv('ALLOW_CREDENTIALS', 'true').lower() in (
|
|
'1',
|
|
'true',
|
|
'yes',
|
|
'on',
|
|
)
|
|
|
|
return {
|
|
'strict': strict,
|
|
'origins': allowed_origins,
|
|
'credentials': allow_credentials,
|
|
'methods': allow_methods,
|
|
'headers': allow_headers,
|
|
}
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def platform_cors(request: Request, call_next):
|
|
"""Platform CORS middleware - accepts all origins (API-level CORS enforced in gateway)."""
|
|
try:
|
|
path = str(request.url.path)
|
|
if path.startswith('/platform/') or path == '/platform':
|
|
cfg = _platform_cors_config()
|
|
origin = request.headers.get('origin') or request.headers.get('Origin')
|
|
origin_allowed = False
|
|
if origin:
|
|
if '*' in cfg['origins']:
|
|
if cfg['strict'] and cfg['credentials']:
|
|
try:
|
|
lo = origin.lower()
|
|
origin_allowed = (
|
|
lo.startswith('http://localhost')
|
|
or lo.startswith('https://localhost')
|
|
or lo.startswith('http://127.0.0.1')
|
|
or lo.startswith('https://127.0.0.1')
|
|
)
|
|
except Exception:
|
|
origin_allowed = False
|
|
else:
|
|
origin_allowed = True
|
|
else:
|
|
origin_allowed = origin in cfg['origins']
|
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
from fastapi.responses import Response as _Resp
|
|
|
|
headers = {}
|
|
# Platform CORS is permissive - echo origin unless strict mode blocks it
|
|
if origin_allowed and origin:
|
|
headers['Access-Control-Allow-Origin'] = origin
|
|
headers['Vary'] = 'Origin'
|
|
elif '*' in cfg['origins'] and not cfg['strict'] and origin:
|
|
# Wildcard without strict mode - echo the origin for credentials support
|
|
headers['Access-Control-Allow-Origin'] = origin
|
|
headers['Vary'] = 'Origin'
|
|
elif '*' in cfg['origins'] and cfg['strict'] and cfg['credentials'] and origin:
|
|
# Strict mode with credentials - explicitly block non-localhost
|
|
headers['Access-Control-Allow-Origin'] = ''
|
|
headers['Access-Control-Allow-Methods'] = ', '.join(cfg['methods'])
|
|
headers['Access-Control-Allow-Headers'] = ', '.join(cfg['headers'])
|
|
if cfg['credentials']:
|
|
headers['Access-Control-Allow-Credentials'] = 'true'
|
|
rid = request.headers.get('x-request-id') or request.headers.get('X-Request-ID')
|
|
if rid:
|
|
headers['request_id'] = rid
|
|
return _Resp(status_code=204, headers=headers)
|
|
|
|
response = await call_next(request)
|
|
try:
|
|
if cfg['credentials']:
|
|
response.headers['Access-Control-Allow-Credentials'] = 'true'
|
|
# Platform CORS is permissive - echo origin unless strict mode blocks it
|
|
if origin_allowed and origin:
|
|
response.headers['Access-Control-Allow-Origin'] = origin
|
|
response.headers['Vary'] = 'Origin'
|
|
elif '*' in cfg['origins'] and not cfg['strict'] and origin:
|
|
# Wildcard without strict mode - echo the origin for credentials support
|
|
response.headers['Access-Control-Allow-Origin'] = origin
|
|
response.headers['Vary'] = 'Origin'
|
|
except Exception:
|
|
pass
|
|
return response
|
|
except Exception:
|
|
pass
|
|
return await call_next(request)
|
|
|
|
|
|
MAX_BODY_SIZE = int(os.getenv('MAX_BODY_SIZE_BYTES', 1_048_576))
|
|
|
|
|
|
def _get_max_body_size() -> int:
|
|
try:
|
|
v = os.getenv('MAX_BODY_SIZE_BYTES')
|
|
if v is None or str(v).strip() == '':
|
|
return MAX_BODY_SIZE
|
|
return int(v)
|
|
except Exception:
|
|
return MAX_BODY_SIZE
|
|
|
|
|
|
class LimitedStreamReader:
|
|
"""
|
|
Wrapper around ASGI receive channel that enforces size limits on chunked requests.
|
|
|
|
Prevents Transfer-Encoding: chunked bypass by tracking accumulated size
|
|
and rejecting streams that exceed the limit.
|
|
"""
|
|
|
|
def __init__(self, receive, max_size: int):
|
|
self.receive = receive
|
|
self.max_size = max_size
|
|
self.bytes_received = 0
|
|
self.over_limit = False
|
|
|
|
async def __call__(self):
|
|
if self.over_limit:
|
|
return {'type': 'http.request', 'body': b'', 'more_body': False}
|
|
|
|
message = await self.receive()
|
|
|
|
if message.get('type') == 'http.request':
|
|
body = message.get('body', b'') or b''
|
|
self.bytes_received += len(body)
|
|
|
|
if self.bytes_received > self.max_size:
|
|
self.over_limit = True
|
|
return {'type': 'http.request', 'body': b'', 'more_body': False}
|
|
|
|
return message
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def body_size_limit(request: Request, call_next):
|
|
"""Enforce request body size limits to prevent DoS attacks.
|
|
|
|
Protects against both:
|
|
- Content-Length header (fast path)
|
|
- Transfer-Encoding: chunked (stream enforcement)
|
|
|
|
Default limit: 1MB (configurable via MAX_BODY_SIZE_BYTES)
|
|
Per-API overrides: MAX_BODY_SIZE_BYTES_<API_TYPE> (e.g., MAX_BODY_SIZE_BYTES_SOAP)
|
|
|
|
Protected paths:
|
|
- /platform/authorization: Strict enforcement (prevent auth DoS)
|
|
- /api/rest/*: Enforce on all requests
|
|
- /api/soap/*: Enforce on XML/SOAP bodies
|
|
- /api/graphql/*: Enforce on GraphQL queries
|
|
- /api/grpc/*: Enforce on gRPC JSON payloads
|
|
"""
|
|
try:
|
|
if os.getenv('DISABLE_BODY_SIZE_LIMIT', 'false').lower() in ('1', 'true', 'yes', 'on'):
|
|
return await call_next(request)
|
|
path = str(request.url.path)
|
|
|
|
try:
|
|
raw_excludes = os.getenv('BODY_LIMIT_EXCLUDE_PATHS', '')
|
|
if raw_excludes:
|
|
excludes = [p.strip() for p in raw_excludes.split(',') if p.strip()]
|
|
if any(
|
|
path == p or (p.endswith('*') and path.startswith(p[:-1])) for p in excludes
|
|
):
|
|
return await call_next(request)
|
|
except Exception:
|
|
pass
|
|
|
|
if path.startswith('/platform/monitor/'):
|
|
return await call_next(request)
|
|
|
|
if path == '/platform/security/settings':
|
|
try:
|
|
return await call_next(request)
|
|
except Exception as e:
|
|
msg = str(e)
|
|
if 'EndOfStream' in msg or 'No response returned' in msg:
|
|
try:
|
|
from models.response_model import ResponseModel as _RM
|
|
from utils.response_util import process_response as _pr
|
|
|
|
return _pr(
|
|
_RM(
|
|
status_code=200, message='Settings updated (middleware bypass)'
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
should_enforce = False
|
|
default_limit = _get_max_body_size()
|
|
limit = default_limit
|
|
|
|
if path.startswith('/platform/authorization'):
|
|
should_enforce = True
|
|
elif path.startswith('/api/soap/'):
|
|
should_enforce = True
|
|
limit = int(os.getenv('MAX_BODY_SIZE_BYTES_SOAP', default_limit))
|
|
elif path.startswith('/api/graphql/'):
|
|
should_enforce = True
|
|
limit = int(os.getenv('MAX_BODY_SIZE_BYTES_GRAPHQL', default_limit))
|
|
elif path.startswith('/api/grpc/'):
|
|
should_enforce = True
|
|
limit = int(os.getenv('MAX_BODY_SIZE_BYTES_GRPC', default_limit))
|
|
elif path.startswith('/api/rest/'):
|
|
should_enforce = True
|
|
limit = int(os.getenv('MAX_BODY_SIZE_BYTES_REST', default_limit))
|
|
elif path.startswith('/api/'):
|
|
should_enforce = True
|
|
elif path.startswith('/platform/'):
|
|
should_enforce = True
|
|
|
|
if not should_enforce:
|
|
return await call_next(request)
|
|
|
|
cl = request.headers.get('content-length')
|
|
transfer_encoding = request.headers.get('transfer-encoding', '').lower()
|
|
|
|
if cl and str(cl).strip() != '':
|
|
try:
|
|
content_length = int(cl)
|
|
if content_length > limit:
|
|
try:
|
|
from utils.audit_util import audit
|
|
|
|
audit(
|
|
request,
|
|
actor=None,
|
|
action='request.body_size_exceeded',
|
|
target=path,
|
|
status='blocked',
|
|
details={
|
|
'content_length': content_length,
|
|
'limit': limit,
|
|
'content_type': request.headers.get('content-type'),
|
|
'transfer_encoding': transfer_encoding or None,
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return process_response(
|
|
ResponseModel(
|
|
status_code=413,
|
|
error_code='REQ001',
|
|
error_message=f'Request entity too large (max: {limit} bytes)',
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if 'chunked' in transfer_encoding or not cl:
|
|
if request.method in ('POST', 'PUT', 'PATCH'):
|
|
wrap_allowed = True
|
|
try:
|
|
env_flag = os.getenv('DISABLE_PLATFORM_CHUNKED_WRAP')
|
|
if isinstance(env_flag, str) and env_flag.strip() != '':
|
|
if env_flag.strip().lower() in ('1', 'true', 'yes', 'on'):
|
|
wrap_allowed = False
|
|
if str(path) == '/platform/authorization':
|
|
wrap_allowed = True
|
|
except Exception:
|
|
pass
|
|
|
|
if wrap_allowed:
|
|
original_receive = request.receive
|
|
limited_reader = LimitedStreamReader(original_receive, limit)
|
|
request._receive = limited_reader
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
|
|
try:
|
|
if wrap_allowed and (
|
|
limited_reader.over_limit or limited_reader.bytes_received > limit
|
|
):
|
|
try:
|
|
from utils.audit_util import audit
|
|
|
|
audit(
|
|
request,
|
|
actor=None,
|
|
action='request.body_size_exceeded',
|
|
target=path,
|
|
status='blocked',
|
|
details={
|
|
'bytes_received': limited_reader.bytes_received,
|
|
'limit': limit,
|
|
'content_type': request.headers.get('content-type'),
|
|
'transfer_encoding': transfer_encoding or 'chunked',
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return process_response(
|
|
ResponseModel(
|
|
status_code=413,
|
|
error_code='REQ001',
|
|
error_message=f'Request entity too large (max: {limit} bytes)',
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return response
|
|
except Exception:
|
|
try:
|
|
if wrap_allowed and (
|
|
limited_reader.over_limit or limited_reader.bytes_received > limit
|
|
):
|
|
return process_response(
|
|
ResponseModel(
|
|
status_code=413,
|
|
error_code='REQ001',
|
|
error_message=f'Request entity too large (max: {limit} bytes)',
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
return await call_next(request)
|
|
except Exception as e:
|
|
try:
|
|
from models.response_model import ResponseModel as _RM
|
|
from utils.response_util import process_response as _pr
|
|
except Exception:
|
|
_RM = None
|
|
_pr = None
|
|
|
|
msg = str(e)
|
|
gateway_logger.error(f'Body size limit middleware error: {msg}', exc_info=True)
|
|
|
|
swallow = False
|
|
try:
|
|
if isinstance(e, RuntimeError) and 'No response returned' in msg:
|
|
swallow = True
|
|
else:
|
|
try:
|
|
import anyio
|
|
|
|
if isinstance(e, getattr(anyio, 'EndOfStream', tuple())):
|
|
swallow = True
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
if swallow and _RM and _pr:
|
|
try:
|
|
return _pr(
|
|
_RM(
|
|
status_code=500,
|
|
error_code='GTW998',
|
|
error_message='Upstream handler failed to produce a response',
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
raise
|
|
|
|
|
|
class PlatformCORSMiddleware:
|
|
"""ASGI-level CORS for /platform/* routes only.
|
|
|
|
API-level CORS is enforced in gateway routes. This middleware should not
|
|
interfere with /api/* paths. It also respects DISABLE_PLATFORM_CORS_ASGI:
|
|
when set to true, this middleware becomes a no-op.
|
|
"""
|
|
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
async def __call__(self, scope, receive, send):
|
|
# If explicitly disabled, act as a passthrough
|
|
try:
|
|
if os.getenv('DISABLE_PLATFORM_CORS_ASGI', 'false').lower() in (
|
|
'1',
|
|
'true',
|
|
'yes',
|
|
'on',
|
|
):
|
|
return await self.app(scope, receive, send)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if scope.get('type') != 'http':
|
|
return await self.app(scope, receive, send)
|
|
path = str(scope.get('path') or '')
|
|
# Only handle platform routes here; leave /api/* to route handlers
|
|
if not (path.startswith('/platform/') or path == '/platform'):
|
|
return await self.app(scope, receive, send)
|
|
|
|
cfg = _platform_cors_config()
|
|
hdrs = {}
|
|
try:
|
|
for k, v in scope.get('headers') or []:
|
|
hdrs[k.decode('latin1').lower()] = v.decode('latin1')
|
|
except Exception:
|
|
pass
|
|
origin = hdrs.get('origin')
|
|
|
|
# Evaluate origin allowance based on config
|
|
origin_allowed = False
|
|
if origin:
|
|
if '*' in cfg['origins']:
|
|
if cfg['strict'] and cfg['credentials']:
|
|
lo = origin.lower()
|
|
origin_allowed = (
|
|
lo.startswith('http://localhost')
|
|
or lo.startswith('https://localhost')
|
|
or lo.startswith('http://127.0.0.1')
|
|
or lo.startswith('https://127.0.0.1')
|
|
)
|
|
else:
|
|
origin_allowed = True
|
|
else:
|
|
origin_allowed = origin in cfg['origins']
|
|
|
|
if str(scope.get('method', '')).upper() == 'OPTIONS':
|
|
headers = []
|
|
if origin_allowed and origin:
|
|
headers.append((b'access-control-allow-origin', origin.encode('latin1')))
|
|
headers.append((b'vary', b'Origin'))
|
|
else:
|
|
try:
|
|
if origin and '*' in cfg['origins'] and cfg['strict'] and cfg['credentials']:
|
|
headers.append((b'access-control-allow-origin', b''))
|
|
except Exception:
|
|
pass
|
|
headers.append(
|
|
(b'access-control-allow-methods', ', '.join(cfg['methods']).encode('latin1'))
|
|
)
|
|
headers.append(
|
|
(b'access-control-allow-headers', ', '.join(cfg['headers']).encode('latin1'))
|
|
)
|
|
if cfg['credentials']:
|
|
headers.append((b'access-control-allow-credentials', b'true'))
|
|
rid = hdrs.get('x-request-id')
|
|
if rid:
|
|
headers.append((b'request_id', rid.encode('latin1')))
|
|
await send({'type': 'http.response.start', 'status': 204, 'headers': headers})
|
|
await send({'type': 'http.response.body', 'body': b''})
|
|
return
|
|
|
|
async def send_wrapper(message):
|
|
# Do not modify non-OPTIONS responses here; platform HTTP middleware handles
|
|
# response headers to avoid duplicates.
|
|
await send(message)
|
|
|
|
return await self.app(scope, receive, send_wrapper)
|
|
except Exception:
|
|
return await self.app(scope, receive, send)
|
|
|
|
|
|
doorman.add_middleware(PlatformCORSMiddleware)
|
|
doorman.add_middleware(GlobalLoggingMiddleware)
|
|
|
|
# Add tier-based rate limiting middleware (skip in live/test to avoid 429 floods)
|
|
try:
|
|
from middleware.tier_rate_limit_middleware import TierRateLimitMiddleware
|
|
import os as _os
|
|
|
|
_skip_tier = _os.getenv('SKIP_TIER_RATE_LIMIT', '').lower() in (
|
|
'1', 'true', 'yes', 'on'
|
|
)
|
|
_live = _os.getenv('DOORMAN_RUN_LIVE', '').lower() in ('1', 'true', 'yes', 'on')
|
|
try:
|
|
import sys as __sys
|
|
|
|
_is_pytest = 'PYTEST_CURRENT_TEST' in _os.environ or 'pytest' in __sys.modules
|
|
except Exception:
|
|
_is_pytest = False
|
|
if not _skip_tier:
|
|
doorman.add_middleware(TierRateLimitMiddleware)
|
|
logging.getLogger('doorman.gateway').info('Tier-based rate limiting middleware enabled')
|
|
else:
|
|
logging.getLogger('doorman.gateway').info('Tier-based rate limiting middleware skipped')
|
|
except Exception as e:
|
|
logging.getLogger('doorman.gateway').warning(
|
|
f'Failed to enable tier rate limiting middleware: {e}'
|
|
)
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def request_id_middleware(request: Request, call_next):
|
|
try:
|
|
from utils.correlation_util import get_correlation_id, set_correlation_id
|
|
|
|
rid = (
|
|
getattr(request.state, 'request_id', None)
|
|
or get_correlation_id()
|
|
or request.headers.get('x-request-id')
|
|
or request.headers.get('request-id')
|
|
or request.headers.get('X-Request-ID')
|
|
)
|
|
if not rid:
|
|
rid = str(uuid.uuid4())
|
|
|
|
# Store in state
|
|
if not hasattr(request.state, 'request_id'):
|
|
request.state.request_id = rid
|
|
|
|
# Ensure correlation ID is set
|
|
set_correlation_id(rid)
|
|
|
|
# Optional logging
|
|
try:
|
|
settings = get_cached_settings()
|
|
trust_xff = bool(settings.get('trust_x_forwarded_for', False))
|
|
direct_ip = getattr(getattr(request, 'client', None), 'host', None)
|
|
effective_ip = _policy_get_client_ip(request, trust_xff)
|
|
gateway_logger.info(
|
|
f'Entry: client_ip={direct_ip} effective_ip={effective_ip} method={request.method} path={str(request.url.path)}'
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
response = await call_next(request)
|
|
|
|
try:
|
|
response.headers['X-Request-ID'] = rid
|
|
response.headers['request_id'] = rid
|
|
except Exception as e:
|
|
gateway_logger.warning(f'Failed to set response headers: {str(e)}')
|
|
|
|
return response
|
|
except Exception as e:
|
|
gateway_logger.error(f'Request ID middleware error: {str(e)}', exc_info=True)
|
|
raise
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def security_headers(request: Request, call_next):
|
|
response = await call_next(request)
|
|
try:
|
|
response.headers.setdefault('X-Content-Type-Options', 'nosniff')
|
|
response.headers.setdefault('X-Frame-Options', 'DENY')
|
|
response.headers.setdefault('Referrer-Policy', 'no-referrer')
|
|
response.headers.setdefault(
|
|
'Permissions-Policy', 'geolocation=(), microphone=(), camera=()'
|
|
)
|
|
|
|
try:
|
|
# Relax CSP for interactive docs to allow required scripts/styles
|
|
_path = str(getattr(getattr(request, 'url', None), 'path', '') or '')
|
|
csp_env = os.getenv('CONTENT_SECURITY_POLICY')
|
|
if csp_env is not None and csp_env.strip():
|
|
csp = csp_env
|
|
else:
|
|
if _path.startswith('/platform/docs') or _path.startswith('/platform/redoc'):
|
|
# Allow Swagger/Redoc assets from jsDelivr and embedding in iframes
|
|
csp = (
|
|
"default-src 'self'; "
|
|
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
|
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
|
"img-src 'self' data: https://cdn.jsdelivr.net; "
|
|
"font-src 'self' data: https://cdn.jsdelivr.net; "
|
|
"connect-src 'self'; "
|
|
"frame-ancestors *; "
|
|
"base-uri 'self';"
|
|
)
|
|
try:
|
|
# Remove X-Frame-Options to allow embedding via frame-ancestors
|
|
if 'X-Frame-Options' in response.headers:
|
|
del response.headers['X-Frame-Options']
|
|
except Exception:
|
|
pass
|
|
else:
|
|
csp = (
|
|
"default-src 'none'; "
|
|
"frame-ancestors 'none'; "
|
|
"base-uri 'none'; "
|
|
"form-action 'self'; "
|
|
"img-src 'self' data:; "
|
|
"connect-src 'self';"
|
|
)
|
|
response.headers.setdefault('Content-Security-Policy', csp)
|
|
except Exception:
|
|
pass
|
|
if os.getenv('HTTPS_ONLY', 'false').lower() == 'true':
|
|
response.headers.setdefault(
|
|
'Strict-Transport-Security', 'max-age=15552000; includeSubDomains; preload'
|
|
)
|
|
except Exception:
|
|
pass
|
|
return response
|
|
|
|
|
|
"""Logging configuration
|
|
|
|
Prefer file logging to LOGS_DIR/doorman.log when writable; otherwise, fall back
|
|
to console so production environments (e.g., ECS/EKS/Lambda) still capture logs.
|
|
Respects LOG_FORMAT=json|plain.
|
|
"""
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
_env_logs_dir = os.getenv('LOGS_DIR')
|
|
LOGS_DIR = (
|
|
os.path.abspath(_env_logs_dir) if _env_logs_dir else os.path.join(BASE_DIR, 'platform-logs')
|
|
)
|
|
|
|
|
|
# Build formatters
|
|
class JSONFormatter(logging.Formatter):
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
request_id = getattr(record, 'request_id', 'no-request-id')
|
|
payload = {
|
|
'time': self.formatTime(record, '%Y-%m-%dT%H:%M:%S'),
|
|
'name': record.name,
|
|
'level': record.levelname,
|
|
'request_id': request_id,
|
|
'message': record.getMessage(),
|
|
}
|
|
try:
|
|
return json.dumps(payload, ensure_ascii=False)
|
|
except Exception:
|
|
return f'{payload}'
|
|
|
|
|
|
_fmt_is_json = os.getenv('LOG_FORMAT', 'plain').lower() == 'json'
|
|
_file_handler = None
|
|
try:
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
_file_handler = RotatingFileHandler(
|
|
filename=os.path.join(LOGS_DIR, 'doorman.log'),
|
|
maxBytes=10 * 1024 * 1024,
|
|
backupCount=5,
|
|
encoding='utf-8',
|
|
)
|
|
_file_handler.setFormatter(
|
|
JSONFormatter()
|
|
if _fmt_is_json
|
|
else logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(request_id)s | %(message)s')
|
|
)
|
|
except Exception as _e:
|
|
# Attempt fallback to /tmp which is commonly writable on containers
|
|
try:
|
|
_tmp_dir = '/tmp/doorman-logs'
|
|
os.makedirs(_tmp_dir, exist_ok=True)
|
|
_file_handler = RotatingFileHandler(
|
|
filename=os.path.join(_tmp_dir, 'doorman.log'),
|
|
maxBytes=10 * 1024 * 1024,
|
|
backupCount=5,
|
|
encoding='utf-8',
|
|
)
|
|
_file_handler.setFormatter(
|
|
JSONFormatter()
|
|
if _fmt_is_json
|
|
else logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(request_id)s | %(message)s')
|
|
)
|
|
logging.getLogger('doorman.gateway').warning(
|
|
f'Primary LOGS_DIR={LOGS_DIR} not writable ({_e}); falling back to {_tmp_dir}'
|
|
)
|
|
except Exception as _e2:
|
|
logging.getLogger('doorman.gateway').warning(
|
|
f'File logging disabled ({_e2}); using console logging only'
|
|
)
|
|
_file_handler = None
|
|
|
|
|
|
# Configure all doorman loggers to use the same handler and prevent propagation
|
|
def configure_logger(logger_name: str) -> logging.Logger:
|
|
logger = logging.getLogger(logger_name)
|
|
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
numeric_level = getattr(logging, log_level, logging.INFO)
|
|
logger.setLevel(numeric_level)
|
|
logger.propagate = False
|
|
|
|
for handler in logger.handlers[:]:
|
|
logger.removeHandler(handler)
|
|
|
|
from utils.correlation_util import correlation_id, RequestIdFilter
|
|
|
|
class RedactFilter(logging.Filter):
|
|
"""Comprehensive logging redaction filter for sensitive data.
|
|
|
|
Redacts:
|
|
- Authorization headers (Bearer, Basic, API-Key, etc.)
|
|
- Access/refresh tokens
|
|
- Passwords and secrets
|
|
- Cookies and session data
|
|
- API keys and credentials
|
|
- CSRF tokens
|
|
"""
|
|
|
|
PATTERNS = [
|
|
re.compile(r'(?i)(authorization\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(x-api-key\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(api[_-]?key\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(api[_-]?secret\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(access[_-]?token\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'(?i)(refresh[_-]?token\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'(?i)(token\s*["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9_\-\.]{20,})(["\']?)'),
|
|
re.compile(r'(?i)(password\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n]+)(["\']?)'),
|
|
re.compile(r'(?i)(secret\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'(?i)(client[_-]?secret\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'(?i)(cookie\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(set-cookie\s*[:=]\s*)([^;\r\n]+)'),
|
|
re.compile(r'(?i)(x-csrf-token\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'(?i)(csrf[_-]?token\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(r'\b(eyJ[a-zA-Z0-9_\-]+\.eyJ[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+)\b'),
|
|
re.compile(r'(?i)(session[_-]?id\s*["\']?\s*[:=]\s*["\']?)([^"\';\r\n\s]+)(["\']?)'),
|
|
re.compile(
|
|
r'(-----BEGIN[A-Z\s]+PRIVATE KEY-----)(.*?)(-----END[A-Z\s]+PRIVATE KEY-----)',
|
|
re.DOTALL,
|
|
),
|
|
]
|
|
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
try:
|
|
msg = str(record.getMessage())
|
|
red = msg
|
|
|
|
for pat in self.PATTERNS:
|
|
if pat.groups == 3 and pat.flags & re.DOTALL:
|
|
red = pat.sub(r'\1[REDACTED]\3', red)
|
|
elif pat.groups >= 2:
|
|
red = pat.sub(
|
|
lambda m: (
|
|
m.group(1)
|
|
+ '[REDACTED]'
|
|
+ (m.group(3) if m.lastindex and m.lastindex >= 3 else '')
|
|
),
|
|
red,
|
|
)
|
|
else:
|
|
red = pat.sub('[REDACTED]', red)
|
|
|
|
if red != msg:
|
|
record.msg = red
|
|
if hasattr(record, 'args') and record.args:
|
|
try:
|
|
if isinstance(record.args, dict):
|
|
record.args = {
|
|
k: '[REDACTED]'
|
|
if 'token' in str(k).lower()
|
|
or 'password' in str(k).lower()
|
|
or 'secret' in str(k).lower()
|
|
or 'authorization' in str(k).lower()
|
|
else v
|
|
for k, v in record.args.items()
|
|
}
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
console = logging.StreamHandler(stream=sys.stdout)
|
|
console.setLevel(numeric_level)
|
|
console.setFormatter(
|
|
JSONFormatter()
|
|
if _fmt_is_json
|
|
else logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(request_id)s | %(message)s')
|
|
)
|
|
console.addFilter(RequestIdFilter())
|
|
console.addFilter(RedactFilter())
|
|
logger.addHandler(console)
|
|
|
|
if _file_handler is not None:
|
|
if not any(
|
|
isinstance(f, logging.Filter) and hasattr(f, 'PATTERNS') for f in _file_handler.filters
|
|
):
|
|
_file_handler.addFilter(RequestIdFilter())
|
|
_file_handler.addFilter(RedactFilter())
|
|
logger.addHandler(_file_handler)
|
|
return logger
|
|
|
|
|
|
gateway_logger = configure_logger('doorman.gateway')
|
|
logging_logger = configure_logger('doorman.logging')
|
|
|
|
# Attach in-memory logging handler so logs remain queryable when file logging
|
|
# is not available (e.g., AWS deployments writing to stdout only).
|
|
try:
|
|
from utils.memory_log import MemoryLogHandler
|
|
|
|
_mem_enabled = os.getenv('MEMORY_LOG_ENABLED', 'true').lower() != 'false'
|
|
if _mem_enabled:
|
|
_lvl = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
_num_lvl = getattr(logging, _lvl, logging.INFO)
|
|
_mem_handler = MemoryLogHandler(level=_num_lvl)
|
|
for _name in (
|
|
'doorman.gateway',
|
|
'doorman.logging',
|
|
'doorman.analytics',
|
|
'doorman.audit',
|
|
):
|
|
try:
|
|
_lg = logging.getLogger(_name)
|
|
_lg.addHandler(_mem_handler)
|
|
except Exception:
|
|
pass
|
|
gateway_logger.info('In-memory log handler enabled for UI log queries')
|
|
else:
|
|
gateway_logger.info('In-memory log handler disabled (MEMORY_LOG_ENABLED=false)')
|
|
except Exception as _e:
|
|
try:
|
|
gateway_logger.warning(f'Failed to enable in-memory log handler: {_e}')
|
|
except Exception:
|
|
pass
|
|
|
|
# Security Audit Middleware (should be close to top to catch all requests)
|
|
doorman.add_middleware(SecurityAuditMiddleware)
|
|
|
|
# Latency Injection (Chaos Mode)
|
|
doorman.add_middleware(LatencyInjectionMiddleware)
|
|
|
|
# Add GZip compression for responses > 1KB(configurable via environment variables)
|
|
# This should be added early in the middleware stack so it compresses final responses
|
|
try:
|
|
compression_enabled = os.getenv('COMPRESSION_ENABLED', 'true').lower() == 'true'
|
|
if compression_enabled:
|
|
compression_level = int(os.getenv('COMPRESSION_LEVEL', '1'))
|
|
compression_minimum_size = int(os.getenv('COMPRESSION_MINIMUM_SIZE', '500'))
|
|
|
|
# Validate compression level (1-9)
|
|
if not 1 <= compression_level <= 9:
|
|
gateway_logger.warning(
|
|
f'Invalid COMPRESSION_LEVEL={compression_level}. Must be 1-9. Using default: 1'
|
|
)
|
|
compression_level = 1
|
|
|
|
doorman.add_middleware(
|
|
GZipMiddleware, minimum_size=compression_minimum_size, compresslevel=compression_level
|
|
)
|
|
gateway_logger.info(
|
|
f'Response compression enabled: level={compression_level}, '
|
|
f'minimum_size={compression_minimum_size} bytes'
|
|
)
|
|
else:
|
|
gateway_logger.info('Response compression disabled (COMPRESSION_ENABLED=false)')
|
|
except Exception as e:
|
|
gateway_logger.warning(f'Failed to configure response compression: {e}. Compression disabled.')
|
|
|
|
|
|
# Ensure platform responses set Vary=Origin (and not Accept-Encoding) for CORS tests.
|
|
class _VaryOriginMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request, call_next):
|
|
response = await call_next(request)
|
|
try:
|
|
p = str(request.url.path)
|
|
if p.startswith('/platform/'):
|
|
# Force Vary to exactly 'Origin'
|
|
try:
|
|
_ = response.headers.pop('Vary', None)
|
|
except Exception:
|
|
pass
|
|
response.headers['Vary'] = 'Origin'
|
|
except Exception:
|
|
pass
|
|
return response
|
|
|
|
|
|
doorman.add_middleware(_VaryOriginMiddleware)
|
|
|
|
# Now that logging is configured, attempt to migrate any legacy 'generated/' dir
|
|
try:
|
|
_migrate_generated_directory()
|
|
except Exception:
|
|
# Non-fatal: migration best-effort only
|
|
pass
|
|
|
|
audit_logger = logging.getLogger('doorman.audit')
|
|
audit_logger.setLevel(logging.INFO)
|
|
audit_logger.propagate = False
|
|
for h in audit_logger.handlers[:]:
|
|
audit_logger.removeHandler(h)
|
|
try:
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
_audit_file = RotatingFileHandler(
|
|
filename=os.path.join(LOGS_DIR, 'doorman-trail.log'),
|
|
maxBytes=10 * 1024 * 1024,
|
|
backupCount=5,
|
|
encoding='utf-8',
|
|
)
|
|
_audit_file.setFormatter(
|
|
JSONFormatter()
|
|
if _fmt_is_json
|
|
else logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
)
|
|
try:
|
|
for eh in gateway_logger.handlers:
|
|
for f in getattr(eh, 'filters', []):
|
|
_audit_file.addFilter(f)
|
|
except Exception:
|
|
pass
|
|
audit_logger.addHandler(_audit_file)
|
|
except Exception as _e:
|
|
console = logging.StreamHandler(stream=sys.stdout)
|
|
console.setLevel(logging.INFO)
|
|
console.setFormatter(
|
|
JSONFormatter()
|
|
if _fmt_is_json
|
|
else logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
)
|
|
try:
|
|
for eh in gateway_logger.handlers:
|
|
for f in getattr(eh, 'filters', []):
|
|
console.addFilter(f)
|
|
except Exception:
|
|
pass
|
|
audit_logger.addHandler(console)
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
mongo_db_uri: str = os.getenv('MONGO_DB_URI')
|
|
jwt_secret_key: str = os.getenv('JWT_SECRET_KEY')
|
|
jwt_algorithm: str = 'HS256'
|
|
jwt_access_token_expires: timedelta = timedelta(
|
|
minutes=int(os.getenv('ACCESS_TOKEN_EXPIRES_MINUTES', 15))
|
|
)
|
|
jwt_refresh_token_expires: timedelta = timedelta(
|
|
days=int(os.getenv('REFRESH_TOKEN_EXPIRES_DAYS', 30))
|
|
)
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def ip_filter_middleware(request: Request, call_next):
|
|
try:
|
|
path = str(request.url.path)
|
|
if path == '/platform/security/settings':
|
|
return await call_next(request)
|
|
|
|
settings = get_cached_settings()
|
|
wl = settings.get('ip_whitelist') or []
|
|
bl = settings.get('ip_blacklist') or []
|
|
trust_xff = bool(settings.get('trust_x_forwarded_for'))
|
|
client_ip = _policy_get_client_ip(request, trust_xff)
|
|
xff_hdr = request.headers.get('x-forwarded-for') or request.headers.get('X-Forwarded-For')
|
|
|
|
try:
|
|
import os
|
|
|
|
settings = get_cached_settings()
|
|
env_flag = os.getenv('LOCAL_HOST_IP_BYPASS')
|
|
allow_local = (
|
|
(env_flag.lower() == 'true')
|
|
if isinstance(env_flag, str) and env_flag.strip() != ''
|
|
else bool(settings.get('allow_localhost_bypass'))
|
|
)
|
|
if allow_local:
|
|
direct_ip = getattr(getattr(request, 'client', None), 'host', None)
|
|
has_forward = any(
|
|
request.headers.get(h)
|
|
for h in (
|
|
'x-forwarded-for',
|
|
'X-Forwarded-For',
|
|
'x-real-ip',
|
|
'X-Real-IP',
|
|
'cf-connecting-ip',
|
|
'CF-Connecting-IP',
|
|
'forwarded',
|
|
'Forwarded',
|
|
)
|
|
)
|
|
if direct_ip and _policy_is_loopback(direct_ip) and not has_forward:
|
|
return await call_next(request)
|
|
except Exception:
|
|
pass
|
|
|
|
if client_ip:
|
|
if wl and not _policy_ip_in_list(client_ip, wl):
|
|
try:
|
|
audit(
|
|
request,
|
|
actor=None,
|
|
action='ip.global_deny',
|
|
target=client_ip,
|
|
status='blocked',
|
|
details={
|
|
'reason': 'not_in_whitelist',
|
|
'xff': xff_hdr,
|
|
'source_ip': getattr(getattr(request, 'client', None), 'host', None),
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
from fastapi.responses import JSONResponse
|
|
|
|
return JSONResponse(
|
|
status_code=403,
|
|
content={
|
|
'status_code': 403,
|
|
'error_code': 'SEC010',
|
|
'error_message': 'IP not allowed',
|
|
},
|
|
)
|
|
if bl and _policy_ip_in_list(client_ip, bl):
|
|
try:
|
|
audit(
|
|
request,
|
|
actor=None,
|
|
action='ip.global_deny',
|
|
target=client_ip,
|
|
status='blocked',
|
|
details={
|
|
'reason': 'blacklisted',
|
|
'xff': xff_hdr,
|
|
'source_ip': getattr(getattr(request, 'client', None), 'host', None),
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
from fastapi.responses import JSONResponse
|
|
|
|
return JSONResponse(
|
|
status_code=403,
|
|
content={
|
|
'status_code': 403,
|
|
'error_code': 'SEC011',
|
|
'error_message': 'IP blocked',
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
return await call_next(request)
|
|
|
|
|
|
@doorman.middleware('http')
|
|
async def metrics_middleware(request: Request, call_next):
|
|
start = asyncio.get_event_loop().time()
|
|
|
|
def _parse_len(val: str | None) -> int:
|
|
try:
|
|
return int(val) if val is not None else 0
|
|
except Exception:
|
|
return 0
|
|
|
|
# Include request headers size plus body length if present
|
|
try:
|
|
_req_hdr_bytes = sum(len(k) + len(v) for k, v in (request.headers or {}).items())
|
|
except Exception:
|
|
_req_hdr_bytes = 0
|
|
_req_body_len = _parse_len(request.headers.get('content-length'))
|
|
bytes_in = _req_hdr_bytes + _req_body_len
|
|
response = None
|
|
try:
|
|
response = await call_next(request)
|
|
return response
|
|
finally:
|
|
try:
|
|
if str(request.url.path).startswith('/api/'):
|
|
end = asyncio.get_event_loop().time()
|
|
duration_ms = (end - start) * 1000.0
|
|
status = getattr(response, 'status_code', 500) if response is not None else 500
|
|
username = None
|
|
api_key = None
|
|
|
|
try:
|
|
from utils.auth_util import auth_required as _auth_required
|
|
|
|
payload = await _auth_required(request)
|
|
username = payload.get('sub') if isinstance(payload, dict) else None
|
|
except Exception:
|
|
pass
|
|
|
|
p = str(request.url.path)
|
|
if p.startswith('/api/rest/'):
|
|
parts = p.split('/')
|
|
try:
|
|
idx = parts.index('rest')
|
|
api_key = (
|
|
f'rest:{parts[idx + 1]}'
|
|
if len(parts) > idx + 1 and parts[idx + 1]
|
|
else 'rest:unknown'
|
|
)
|
|
except ValueError:
|
|
api_key = 'rest:unknown'
|
|
elif p.startswith('/api/graphql/'):
|
|
seg = p.rsplit('/', 1)[-1] or 'unknown'
|
|
api_key = f'graphql:{seg}'
|
|
elif p.startswith('/api/soap/'):
|
|
seg = p.rsplit('/', 1)[-1] or 'unknown'
|
|
api_key = f'soap:{seg}'
|
|
# Include response headers size plus body length if present
|
|
clen = 0
|
|
try:
|
|
headers = getattr(response, 'headers', {}) or {}
|
|
_resp_hdr_bytes = sum(len(k) + len(v) for k, v in headers.items())
|
|
clen = _parse_len(headers.get('content-length'))
|
|
if clen == 0:
|
|
# Fallback to explicit body length header set by response_util
|
|
clen = _parse_len(headers.get('x-body-length'))
|
|
if clen == 0:
|
|
body = getattr(response, 'body', None)
|
|
if isinstance(body, (bytes, bytearray)):
|
|
clen = len(body)
|
|
clen = _resp_hdr_bytes + clen
|
|
except Exception:
|
|
clen = 0
|
|
|
|
# Only record monitor metrics for API traffic; exclude platform endpoints
|
|
if p.startswith('/api/'):
|
|
metrics_store.record(
|
|
status=status,
|
|
duration_ms=duration_ms,
|
|
username=username,
|
|
api_key=api_key,
|
|
bytes_in=bytes_in,
|
|
bytes_out=clen,
|
|
)
|
|
try:
|
|
if username:
|
|
from utils.bandwidth_util import _get_user, add_usage
|
|
|
|
u = _get_user(username)
|
|
if (
|
|
u
|
|
and u.get('bandwidth_limit_bytes')
|
|
and u.get('bandwidth_limit_enabled') is not False
|
|
):
|
|
add_usage(
|
|
username,
|
|
int(bytes_in) + int(clen),
|
|
u.get('bandwidth_limit_window') or 'day',
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
# Normalize platform CORS Vary header last to avoid gzip appending
|
|
if str(request.url.path).startswith('/platform/'):
|
|
headers = getattr(response, 'headers', None)
|
|
if headers is not None:
|
|
try:
|
|
_ = headers.pop('Vary', None)
|
|
except Exception:
|
|
pass
|
|
headers['Vary'] = 'Origin'
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def automatic_purger(interval_seconds):
|
|
while True:
|
|
await asyncio.sleep(interval_seconds)
|
|
await purge_expired_tokens()
|
|
gateway_logger.info('Expired JWTs purged from blacklist.')
|
|
|
|
|
|
@doorman.exception_handler(JWTError)
|
|
async def jwt_exception_handler(request: Request, exc: JWTError):
|
|
return process_response(
|
|
ResponseModel(status_code=401, error_code='JWT001', error_message='Invalid token').dict(),
|
|
'rest',
|
|
)
|
|
|
|
|
|
@doorman.exception_handler(500)
|
|
async def internal_server_error_handler(request: Request, exc: Exception):
|
|
return process_response(
|
|
ResponseModel(
|
|
status_code=500, error_code='ISE001', error_message='Internal Server Error'
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
|
|
|
|
@doorman.exception_handler(RequestValidationError)
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
|
# DEBUG: Log validation errors
|
|
import logging
|
|
|
|
log = logging.getLogger('doorman.gateway')
|
|
log.error(f'Validation error on {request.method} {request.url.path}')
|
|
log.error(f'Validation errors: {exc.errors()}')
|
|
log.error(f'Request body: {await request.body()}')
|
|
|
|
return process_response(
|
|
ResponseModel(
|
|
status_code=422, error_code='VAL001', error_message='Validation Error'
|
|
).dict(),
|
|
'rest',
|
|
)
|
|
|
|
|
|
cache_manager.init_app(doorman)
|
|
|
|
doorman.include_router(gateway_router, prefix='/api', tags=['Gateway'])
|
|
doorman.include_router(metrics_router, tags=['Metrics'])
|
|
doorman.include_router(authorization_router, prefix='/platform', tags=['Authorization'])
|
|
doorman.include_router(user_router, prefix='/platform/user', tags=['User'])
|
|
doorman.include_router(api_router, prefix='/platform/api', tags=['API'])
|
|
doorman.include_router(endpoint_router, prefix='/platform/endpoint', tags=['Endpoint'])
|
|
doorman.include_router(group_router, prefix='/platform/group', tags=['Group'])
|
|
doorman.include_router(role_router, prefix='/platform/role', tags=['Role'])
|
|
doorman.include_router(subscription_router, prefix='/platform/subscription', tags=['Subscription'])
|
|
doorman.include_router(routing_router, prefix='/platform/routing', tags=['Routing'])
|
|
doorman.include_router(proto_router, prefix='/platform/proto', tags=['Proto'])
|
|
doorman.include_router(logging_router, prefix='/platform/logging', tags=['Logging'])
|
|
doorman.include_router(dashboard_router, prefix='/platform/dashboard', tags=['Dashboard'])
|
|
doorman.include_router(memory_router, prefix='/platform', tags=['Memory'])
|
|
doorman.include_router(security_router, prefix='/platform', tags=['Security'])
|
|
doorman.include_router(monitor_router, prefix='/platform', tags=['Monitor'])
|
|
doorman.include_router(credit_router, prefix='/platform/credit', tags=['Credit'])
|
|
doorman.include_router(demo_router, prefix='/platform/demo', tags=['Demo'])
|
|
doorman.include_router(config_router, prefix='/platform', tags=['Config'])
|
|
doorman.include_router(tools_router, prefix='/platform/tools', tags=['Tools'])
|
|
doorman.include_router(config_hot_reload_router, prefix='/platform', tags=['Config Hot Reload'])
|
|
doorman.include_router(vault_router, prefix='/platform/vault', tags=['Vault'])
|
|
doorman.include_router(analytics_router, prefix='/platform', tags=['Analytics'])
|
|
doorman.include_router(tier_router, prefix='/platform/tiers', tags=['Tiers'])
|
|
doorman.include_router(rate_limit_rule_router, prefix='/platform/rate-limits', tags=['Rate Limits'])
|
|
doorman.include_router(quota_router, prefix='/platform/quota', tags=['Quota'])
|
|
doorman.include_router(openapi_router, tags=['OpenAPI Discovery'])
|
|
doorman.include_router(wsdl_router, tags=['WSDL Discovery'])
|
|
doorman.include_router(graphql_routes_router, tags=['GraphQL'])
|
|
doorman.include_router(grpc_router, tags=['gRPC Discovery'])
|
|
|
|
|
|
def start() -> None:
|
|
if os.path.exists(PID_FILE):
|
|
gateway_logger.info('doorman is already running!')
|
|
sys.exit(0)
|
|
if os.name == 'nt':
|
|
process = subprocess.Popen(
|
|
[sys.executable, __file__, 'run'],
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
else:
|
|
process = subprocess.Popen(
|
|
[sys.executable, __file__, 'run'],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
preexec_fn=os.setsid,
|
|
)
|
|
with open(PID_FILE, 'w') as f:
|
|
f.write(str(process.pid))
|
|
gateway_logger.info(f'Starting doorman with PID {process.pid}.')
|
|
|
|
|
|
def stop() -> None:
|
|
if not os.path.exists(PID_FILE):
|
|
gateway_logger.info('No running instance found')
|
|
return
|
|
with open(PID_FILE) as f:
|
|
pid = int(f.read())
|
|
try:
|
|
if os.name == 'nt':
|
|
subprocess.call(['taskkill', '/F', '/PID', str(pid)])
|
|
else:
|
|
os.killpg(pid, signal.SIGTERM)
|
|
|
|
deadline = time.time() + 15
|
|
while time.time() < deadline:
|
|
try:
|
|
os.kill(pid, 0)
|
|
time.sleep(0.5)
|
|
except ProcessLookupError:
|
|
break
|
|
gateway_logger.info(f'Stopping doorman with PID {pid}')
|
|
except ProcessLookupError:
|
|
gateway_logger.info('Process already terminated')
|
|
finally:
|
|
if os.path.exists(PID_FILE):
|
|
os.remove(PID_FILE)
|
|
|
|
|
|
def restart() -> None:
|
|
"""Restart the doorman process using PID-based supervisor.
|
|
This function is intended to be invoked from a detached helper process.
|
|
"""
|
|
try:
|
|
stop()
|
|
|
|
time.sleep(1.0)
|
|
except Exception as e:
|
|
gateway_logger.error(f'Error during stop phase of restart: {e}')
|
|
try:
|
|
start()
|
|
except Exception as e:
|
|
gateway_logger.error(f'Error during start phase of restart: {e}')
|
|
|
|
|
|
def run() -> None:
|
|
server_port = int(os.getenv('PORT', 5001))
|
|
max_threads = multiprocessing.cpu_count()
|
|
env_threads = int(os.getenv('THREADS', max_threads))
|
|
num_threads = min(env_threads, max_threads)
|
|
|
|
# Hard validation: memory-only mode requires a single worker.
|
|
# Start-up should fail fast with a clear error instead of silently
|
|
# modifying the configured worker count.
|
|
if database.memory_only and env_threads != 1:
|
|
raise RuntimeError(
|
|
'MEM_OR_EXTERNAL=MEM requires THREADS=1. '
|
|
'Set THREADS=1 for single-process memory mode or switch to MEM_OR_EXTERNAL=REDIS for multi-worker.'
|
|
)
|
|
gateway_logger.info(f'Started doorman with {num_threads} threads on port {server_port}')
|
|
gateway_logger.info(
|
|
'TLS termination should be handled at reverse proxy (Nginx, Traefik, ALB, etc.)'
|
|
)
|
|
uvicorn.run(
|
|
'doorman:doorman',
|
|
host='0.0.0.0',
|
|
port=server_port,
|
|
reload=os.getenv('DEV_RELOAD', 'false').lower() == 'true',
|
|
reload_excludes=['venv/*', 'logs/*'],
|
|
workers=num_threads,
|
|
log_level='info',
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
host = os.getenv('HOST', '0.0.0.0')
|
|
port = int(os.getenv('PORT', '8000'))
|
|
try:
|
|
uvicorn.run(
|
|
'doorman:doorman',
|
|
host=host,
|
|
port=port,
|
|
reload=os.getenv('DEBUG', 'false').lower() == 'true',
|
|
)
|
|
except Exception as e:
|
|
gateway_logger.error(f'Failed to start server: {str(e)}')
|
|
raise
|
|
|
|
|
|
def seed_command() -> None:
|
|
"""Run the demo seeder from command line"""
|
|
import argparse
|
|
|
|
from utils.demo_seed_util import run_seed
|
|
|
|
parser = argparse.ArgumentParser(description='Seed the database with demo data')
|
|
parser.add_argument(
|
|
'--users', type=int, default=60, help='Number of users to create (default: 60)'
|
|
)
|
|
parser.add_argument(
|
|
'--apis', type=int, default=20, help='Number of APIs to create (default: 20)'
|
|
)
|
|
parser.add_argument(
|
|
'--endpoints', type=int, default=6, help='Number of endpoints per API (default: 6)'
|
|
)
|
|
parser.add_argument(
|
|
'--groups', type=int, default=10, help='Number of groups to create (default: 10)'
|
|
)
|
|
parser.add_argument(
|
|
'--protos', type=int, default=6, help='Number of proto files to create (default: 6)'
|
|
)
|
|
parser.add_argument(
|
|
'--logs', type=int, default=2000, help='Number of log entries to create (default: 2000)'
|
|
)
|
|
parser.add_argument(
|
|
'--seed', type=int, default=None, help='Random seed for reproducibility (optional)'
|
|
)
|
|
|
|
args = parser.parse_args(sys.argv[2:]) # Skip 'doorman.py' and 'seed'
|
|
|
|
print('Starting demo seed with:')
|
|
print(f' Users: {args.users}')
|
|
print(f' APIs: {args.apis}')
|
|
print(f' Endpoints per API: {args.endpoints}')
|
|
print(f' Groups: {args.groups}')
|
|
print(f' Protos: {args.protos}')
|
|
print(f' Logs: {args.logs}')
|
|
if args.seed is not None:
|
|
print(f' Random Seed: {args.seed}')
|
|
print()
|
|
|
|
try:
|
|
result = run_seed(
|
|
users=args.users,
|
|
apis=args.apis,
|
|
endpoints=args.endpoints,
|
|
groups=args.groups,
|
|
protos=args.protos,
|
|
logs=args.logs,
|
|
seed=args.seed,
|
|
)
|
|
print('\n✓ Seeding completed successfully!')
|
|
print(f'Result: {result}')
|
|
except Exception as e:
|
|
print(f'\n✗ Seeding failed: {str(e)}')
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) > 1 and sys.argv[1] == 'stop':
|
|
stop()
|
|
elif len(sys.argv) > 1 and sys.argv[1] == 'start':
|
|
start()
|
|
elif len(sys.argv) > 1 and sys.argv[1] == 'restart':
|
|
restart()
|
|
elif len(sys.argv) > 1 and sys.argv[1] == 'run':
|
|
run()
|
|
elif len(sys.argv) > 1 and sys.argv[1] == 'seed':
|
|
seed_command()
|
|
else:
|
|
main()
|