import logging import os import re import tempfile import time import uuid from datetime import timedelta from urllib.parse import urlparse import sentry_sdk from authlib.integrations.flask_client import OAuth from dotenv import load_dotenv from flask import Flask, flash, g, jsonify, redirect, request, session, url_for from flask_babel import Babel, _ from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_login import LoginManager from flask_migrate import Migrate from flask_socketio import SocketIO from flask_sqlalchemy import SQLAlchemy from flask_wtf.csrf import CSRFError, CSRFProtect from prometheus_client import CONTENT_TYPE_LATEST, Counter, Histogram, generate_latest from pythonjsonlogger import jsonlogger from sentry_sdk.integrations.flask import FlaskIntegration from sqlalchemy.pool import StaticPool from werkzeug.http import parse_options_header from werkzeug.middleware.proxy_fix import ProxyFix # Load environment variables load_dotenv() # Initialize extensions db = SQLAlchemy() migrate = Migrate() login_manager = LoginManager() socketio = SocketIO() babel = Babel() csrf = CSRFProtect() limiter = Limiter(key_func=get_remote_address, default_limits=[]) oauth = OAuth() # Initialize Mail (will be configured in create_app) from flask_mail import Mail mail = Mail() # Initialize APScheduler for background tasks from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # Initialize Prometheus metrics REQUEST_COUNT = Counter("tt_requests_total", "Total requests", ["method", "endpoint", "http_status"]) REQUEST_LATENCY = Histogram("tt_request_latency_seconds", "Request latency seconds", ["endpoint"]) # Initialize JSON logger for structured logging json_logger = logging.getLogger("timetracker") json_logger.setLevel(logging.INFO) def log_event(name: str, **kwargs): """Log an event with structured JSON format including request context""" try: extra = {"request_id": getattr(g, "request_id", None), "event": name, **kwargs} try: from app.telemetry.otel_setup import get_trace_context_for_logs, is_otel_tracing_active if is_otel_tracing_active(): extra.update(get_trace_context_for_logs()) except Exception: pass json_logger.info(name, extra=extra) except Exception as e: logging.getLogger(__name__).debug("Structured log_event failed: %s", e) def identify_user(user_id, properties=None): """ Identify a user in the analytics backend (consent-aware). Delegates to telemetry service; only sent when detailed analytics is opted in. """ try: from app.telemetry.service import identify_user as _identify _identify(user_id, properties) except Exception as e: logging.getLogger(__name__).debug("Telemetry identify_user failed: %s", e) def track_event(user_id, event_name, properties=None): """ Track a product analytics event (consent-aware). Delegates to telemetry service; only sent when detailed analytics is opted in. """ try: from app.telemetry.service import send_analytics_event send_analytics_event(user_id, event_name, properties) except Exception: pass def track_page_view(page_name, user_id=None, properties=None): """ Track a page view event (consent-aware). Only sent when detailed analytics is opted in. """ try: if user_id is None: from flask_login import current_user if current_user.is_authenticated: user_id = current_user.id else: return page_properties = { "page_name": page_name, "$pathname": request.path if request else None, "$current_url": request.url if request else None, } if properties: page_properties.update(properties) track_event(user_id, "$pageview", page_properties) except Exception as e: logging.getLogger(__name__).debug("Telemetry track_page_view failed: %s", e) def create_app(config=None): """Application factory pattern""" app = Flask(__name__) logger = logging.getLogger(__name__) bootstrap_mode = os.getenv("TT_BOOTSTRAP_MODE", "").strip().lower() # Validate environment variables on startup (non-blocking warnings in dev, errors in prod) try: from app.utils.env_validation import validate_all is_production = os.getenv("FLASK_ENV", "production") == "production" is_valid, results = validate_all(raise_on_error=is_production) if not is_valid: if is_production: app.logger.error("Environment validation failed - see details below") else: app.logger.warning("Environment validation warnings - see details below") if results.get("warnings"): for warning in results["warnings"]: app.logger.warning(f" - {warning}") if results.get("production", {}).get("issues"): for issue in results["production"]["issues"]: if is_production: app.logger.error(f" - {issue}") else: app.logger.warning(f" - {issue}") except Exception as e: # Don't fail app startup if validation itself fails app.logger.warning(f"Environment validation check failed: {e}") # Make app aware of reverse proxy (scheme/host/port) for correct URL generation & cookies # Trust a single proxy by default; adjust via env if needed app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) # Configuration # Load env-specific config class try: env_name = os.getenv("FLASK_ENV", "production") cfg_map = { "development": "app.config.DevelopmentConfig", "testing": "app.config.TestingConfig", "production": "app.config.ProductionConfig", } app.config.from_object(cfg_map.get(env_name, "app.config.Config")) except Exception: app.config.from_object("app.config.Config") if config: app.config.update(config) # Production safety: refuse to start with default SECRET_KEY if ( app.config.get("FLASK_ENV") == "production" and app.config.get("SECRET_KEY") == "dev-secret-key-change-in-production" ): raise ValueError( "SECRET_KEY must be set explicitly in production. " "Set the SECRET_KEY environment variable to a secure random value." ) # Special handling for SQLite in-memory DB during tests: # ensure a single shared connection so objects don't disappear after commit. try: # In tests, proactively clear POSTGRES_* env hints to avoid accidental overrides if app.config.get("TESTING"): for var in ("POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_HOST", "DATABASE_URL"): try: os.environ.pop(var, None) except Exception: pass db_uri = str(app.config.get("SQLALCHEMY_DATABASE_URI", "") or "") if ( app.config.get("TESTING") and isinstance(db_uri, str) and db_uri.startswith("sqlite") and ":memory:" in db_uri ): # Use a file-based SQLite database during tests to ensure consistent behavior across contexts db_file = os.path.join(tempfile.gettempdir(), f"timetracker_pytest_{os.getpid()}.sqlite") app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_file}" # Also keep permissive engine options for SQLite engine_opts = dict(app.config.get("SQLALCHEMY_ENGINE_OPTIONS") or {}) engine_opts.setdefault("connect_args", {"check_same_thread": False}) app.config["SQLALCHEMY_ENGINE_OPTIONS"] = engine_opts # Avoid attribute expiration on commit during tests to keep objects usable if app.config.get("TESTING"): session_opts = dict(app.config.get("SQLALCHEMY_SESSION_OPTIONS") or {}) session_opts.setdefault("expire_on_commit", False) app.config["SQLALCHEMY_SESSION_OPTIONS"] = session_opts except Exception: # Do not fail app creation for engine option tweaks pass # All templates live in app/templates (legacy root templates/ was merged in) # Prefer Postgres if POSTGRES_* env vars are present but URL points to SQLite # BUT only if DATABASE_URL was not explicitly set to SQLite current_url = app.config.get("SQLALCHEMY_DATABASE_URI", "") explicit_database_url = os.getenv("DATABASE_URL", "") # Only auto-switch to PostgreSQL if: # 1. Not in testing mode # 2. Current URL is SQLite # 3. POSTGRES_* env vars are present # 4. DATABASE_URL was NOT explicitly set to SQLite (respect user's explicit choice) if ( not app.config.get("TESTING") and isinstance(current_url, str) and current_url.startswith("sqlite") and (os.getenv("POSTGRES_DB") or os.getenv("POSTGRES_USER") or os.getenv("POSTGRES_PASSWORD")) and not (explicit_database_url and explicit_database_url.startswith("sqlite")) ): pg_user = os.getenv("POSTGRES_USER", "timetracker") pg_pass = os.getenv("POSTGRES_PASSWORD", "timetracker") pg_db = os.getenv("POSTGRES_DB", "timetracker") pg_host = os.getenv("POSTGRES_HOST", "db") app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql+psycopg2://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}" # Initialize extensions db.init_app(app) migrate.init_app(app, db) login_manager.init_app(app) socketio.init_app(app, cors_allowed_origins="*") oauth.init_app(app) # Fast-path for migration/bootstrap runs: # we only need config + db/migrate + models loaded. Avoid registering routes, # background jobs, integrations, etc. to keep migrations as fast and reliable # as possible during container startup. if bootstrap_mode == "migrate": try: # Ensure all model tables are registered in SQLAlchemy metadata from . import models as _models # noqa: F401 except Exception: pass return app # Initialize audit logging - register event listeners AFTER db.init_app() # Flask-SQLAlchemy uses its own session class, so we need to register with it from sqlalchemy import event from sqlalchemy.orm import Session from app.utils import audit # Register with generic SQLAlchemy Session (catches all Session instances) event.listen(Session, "before_flush", audit.receive_before_flush) event.listen(Session, "after_flush", audit.receive_after_flush) # Also register with Flask-SQLAlchemy's sessionmaker if available # Flask-SQLAlchemy creates sessions from a sessionmaker, so we register with that try: # Get the sessionmaker from Flask-SQLAlchemy if hasattr(db, "session") and hasattr(db.session, "registry"): sessionmaker = db.session.registry() if sessionmaker: # Register with the session class that the sessionmaker creates session_class = sessionmaker.class_ if session_class: event.listen(session_class, "before_flush", audit.receive_before_flush) event.listen(session_class, "after_flush", audit.receive_after_flush) logger.info( f"Registered audit logging with Flask-SQLAlchemy session class: {session_class.__name__}" ) except Exception as e: logger.debug(f"Could not register with Flask-SQLAlchemy sessionmaker: {e}") # Register with SignallingSession (Flask-SQLAlchemy 2.x) try: from flask_sqlalchemy import SignallingSession event.listen(SignallingSession, "before_flush", audit.receive_before_flush) event.listen(SignallingSession, "after_flush", audit.receive_after_flush) logger.info("Registered audit logging with Flask-SQLAlchemy SignallingSession") except (ImportError, AttributeError): pass logger.info("Audit logging event listeners registered") # OpenTelemetry (traces + OTLP metrics) — same OTLP credentials as manual log export try: from app.telemetry.otel_setup import init_opentelemetry init_opentelemetry(app) except Exception as e: logger.warning("OpenTelemetry initialization skipped: %s", e) # Initialize Settings from environment variables on startup. # Skip during bootstrap/migration runs to avoid DB access before schema exists. if bootstrap_mode != "migrate": with app.app_context(): try: from app.models import Settings # This will create Settings if it doesn't exist and initialize from .env. # The get_settings() method automatically initializes new Settings from .env. Settings.get_settings() except Exception as e: # Don't fail app startup if Settings initialization fails # (e.g., database not ready yet, migration not run) app.logger.warning(f"Could not initialize Settings from environment: {e}") # Initialize Flask-Mail from app.utils.email import init_mail init_mail(app) # Initialize and start background scheduler (disabled in tests). # Skip during bootstrap/migration runs to avoid background DB work during migrations. if bootstrap_mode != "migrate": if (not app.config.get("TESTING")) and (not scheduler.running): from app.utils.scheduled_tasks import register_scheduled_tasks scheduler.start() # Register tasks after app context is available, passing app instance with app.app_context(): register_scheduled_tasks(scheduler, app=app) # Base telemetry: send first_seen once per install (idempotent) try: from app.telemetry.service import send_base_first_seen send_base_first_seen() except Exception: pass # Only initialize CSRF protection if enabled if app.config.get("WTF_CSRF_ENABLED"): csrf.init_app(app) try: # Configure limiter defaults from config if provided default_limits = [] raw = app.config.get("RATELIMIT_DEFAULT") if raw: # support semicolon or comma separated limits parts = [p.strip() for p in str(raw).replace(",", ";").split(";") if p.strip()] if parts: default_limits = parts limiter._default_limits = default_limits # set after init limiter.init_app(app) except Exception: limiter.init_app(app) # Configure absolute translation directories before Babel init. # Do NOT run subprocess-based compilation during app creation (slow, noisy). try: translations_dirs = (app.config.get("BABEL_TRANSLATION_DIRECTORIES") or "translations").split(",") base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) abs_dirs = [] for d in translations_dirs: d = d.strip() if not d: continue abs_dirs.append(d if os.path.isabs(d) else os.path.abspath(os.path.join(base_path, d))) if abs_dirs: app.config["BABEL_TRANSLATION_DIRECTORIES"] = os.pathsep.join(abs_dirs) # Optional: best-effort compile missing/stale .mo files (Python-only, incremental). # Disabled by default for faster startup; compile in Docker build instead. if os.getenv("TT_COMPILE_TRANSLATIONS_ON_STARTUP", "false").strip().lower() in ("1", "true", "yes"): from app.utils.i18n import ensure_translations_compiled for d in abs_dirs: ensure_translations_compiled(d) except Exception: pass # Internationalization: locale selector compatible with Flask-Babel v4+ def _select_locale(): try: # 1) User preference from DB from flask_login import current_user if current_user and getattr(current_user, "is_authenticated", False): pref = getattr(current_user, "preferred_language", None) if pref: # Normalize locale code (e.g., 'no' -> 'nb' for Norwegian) return _normalize_locale(pref) # 2) Session override (set-language route) if "preferred_language" in session: return _normalize_locale(session.get("preferred_language")) # 3) Best match with Accept-Language supported = list(app.config.get("LANGUAGES", {}).keys()) or ["en"] matched = request.accept_languages.best_match(supported) or app.config.get("BABEL_DEFAULT_LOCALE", "en") return _normalize_locale(matched) except Exception: return app.config.get("BABEL_DEFAULT_LOCALE", "en") def _normalize_locale(locale_code): """Normalize locale codes for Flask-Babel compatibility. Some locale codes need to be normalized: - 'no' -> 'nb' (Norwegian Bokmål is the standard, but we'll try 'no' first) """ if not locale_code: return "en" locale_code = locale_code.lower().strip() # Try 'no' first - if translations don't exist, Flask-Babel will fall back # If 'no' doesn't work, we can map to 'nb' as fallback # For now, keep 'no' as-is since we have translations/nb/ directory # The directory structure should match what Flask-Babel expects if locale_code == "no": # Use 'nb' for Flask-Babel (standard Norwegian Bokmål locale) # But ensure we have translations in both 'no' and 'nb' directories return "nb" return locale_code babel.init_app( app, default_locale=app.config.get("BABEL_DEFAULT_LOCALE", "en"), default_timezone=app.config.get("TZ", "Europe/Rome"), locale_selector=_select_locale, ) # Ensure gettext helpers available in Jinja try: from flask_babel import gettext as _gettext from flask_babel import ngettext as _ngettext app.jinja_env.globals.update(_=_gettext, ngettext=_ngettext) except Exception: pass # Add Python built-ins that are useful in templates app.jinja_env.globals.update(getattr=getattr) # Log effective database URL (mask password) db_url = app.config.get("SQLALCHEMY_DATABASE_URI", "") try: masked_db_url = re.sub(r"//([^:]+):[^@]+@", r"//\\1:***@", db_url) except Exception: masked_db_url = db_url app.logger.info(f"Using database URL: {masked_db_url}") # Configure login manager login_manager.login_view = "auth.login" login_manager.login_message = "Please log in to access this page." login_manager.login_message_category = "info" # Internationalization selector handled via babel.init_app(locale_selector=...) # Ensure compatibility with tests and different Flask-Login versions: # Some test suites set session['_user_id'] while Flask-Login (or vice versa) # may read 'user_id'. Mirror both keys when one is present so that # programmatic session login in tests works reliably. @app.before_request def _harmonize_login_session_keys(): try: uid = session.get("_user_id") or session.get("user_id") if uid: # Normalize to strings as Flask-Login stores ids as strings uid_str = str(uid) if session.get("_user_id") != uid_str: session["_user_id"] = uid_str if session.get("user_id") != uid_str: session["user_id"] = uid_str except Exception: # Do not block request processing on any session edge case pass # In testing, ensure that if a session user id is present but current_user # isn't populated yet, we proactively authenticate the user for this request. # This improves reliability of auth-dependent integration tests that set # session keys directly or occasionally lose the session between redirects. @app.before_request def _ensure_user_authenticated_in_tests(): try: if app.config.get("TESTING"): from flask_login import current_user, login_user from app.utils.db import safe_query if not getattr(current_user, "is_authenticated", False): uid = session.get("_user_id") or session.get("user_id") if uid: from app.models import User try: user_id_int = int(uid) except (ValueError, TypeError): user = None else: user = safe_query(lambda: User.query.get(user_id_int), default=None) if user and getattr(user, "is_active", True): login_user(user, remember=True) except Exception: # Never fail the request due to this helper pass # Register user loader @login_manager.user_loader def load_user(user_id): """Load user for Flask-Login with proper transaction error handling""" from app.models import User from app.utils.db import safe_query try: user_id_int = int(user_id) except (ValueError, TypeError): return None return safe_query(lambda: User.query.get(user_id_int), default=None) # Check if initial setup is required (skip for certain routes) @app.before_request def check_setup_required(): try: # Skip setup check in testing mode if app.config.get("TESTING"): return # Skip setup check for these routes skip_routes = [ "setup.initial_setup", "static", "auth.login", "auth.logout", "main.health_check", "main.readiness_check", ] if request.endpoint in skip_routes: return # Skip for assets and health checks if request.path.startswith("/static/") or request.path.startswith("/_"): return # API discovery and mobile login must stay JSON (not HTML redirect) during install if request.path.startswith("/api/v1/info") or request.path.startswith("/api/v1/health"): return if request.path == "/api/v1/auth/login" and request.method == "POST": return # Check if setup is complete from app.utils.installation import get_installation_config installation_config = get_installation_config() if not installation_config.is_setup_complete(): return redirect(url_for("setup.initial_setup")) except Exception: pass # Attach request ID for tracing @app.before_request def attach_request_id(): try: g.request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4()) except Exception: pass # Start timer for Prometheus metrics @app.before_request def prom_start_timer(): try: g._start_time = time.time() except Exception: pass # Request logging for /login to trace POSTs reaching the app @app.before_request def log_login_requests(): try: if request.path == "/login": app.logger.info( "%s %s from %s UA=%s", request.method, request.path, request.headers.get("X-Forwarded-For") or request.remote_addr, request.headers.get("User-Agent"), ) except Exception: pass # Record Prometheus metrics and log write operations @app.after_request def record_metrics_and_log(response): latency = time.time() - getattr(g, "_start_time", time.time()) try: # Record Prometheus metrics endpoint = request.endpoint or "unknown" REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency) REQUEST_COUNT.labels(method=request.method, endpoint=endpoint, http_status=response.status_code).inc() except Exception: pass try: from app.telemetry.otel_setup import inject_traceparent_headers, record_http_server_metrics route = getattr(request.url_rule, "rule", None) or (request.endpoint or "unknown") record_http_server_metrics(request.method, route, response.status_code, latency) response = inject_traceparent_headers(response) except Exception: pass try: # Log write operations if request.method in ("POST", "PUT", "PATCH", "DELETE"): app.logger.info( "%s %s -> %s from %s", request.method, request.path, response.status_code, request.headers.get("X-Forwarded-For") or request.remote_addr, ) except Exception: pass return response # Configure session app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(seconds=int(os.getenv("PERMANENT_SESSION_LIFETIME", 86400))) # Setup logging (including JSON logging) from app.utils.setup_logging import setup_logging as _setup_logging _setup_logging(app) # Enable query logging in development mode if app.config.get("FLASK_DEBUG") or app.config.get("TESTING"): try: from app.utils.query_logging import enable_query_counting, enable_query_logging enable_query_logging(app, slow_query_threshold=0.1) enable_query_counting(app) app.logger.info("Query logging enabled (development mode)") except Exception as e: app.logger.warning(f"Could not enable query logging: {e}") # Optional performance instrumentation (slow-request log, query-count when PERF_QUERY_PROFILE=1) try: from app.utils.performance import init_performance_logging init_performance_logging(app) except Exception as e: app.logger.warning(f"Could not init performance logging: {e}") # Load analytics configuration (embedded at build time) from app.config.analytics_defaults import get_analytics_config, has_analytics_configured analytics_config = get_analytics_config() # Log analytics status (for transparency) if has_analytics_configured(): app.logger.info("TimeTracker with analytics configured (telemetry opt-in via admin dashboard)") else: app.logger.info("TimeTracker build without analytics configuration") # Initialize Sentry for error monitoring # Priority: Env var > Built-in default > Disabled sentry_dsn = analytics_config.get("sentry_dsn", "") if sentry_dsn: try: sentry_sdk.init( dsn=sentry_dsn, integrations=[FlaskIntegration()], traces_sample_rate=analytics_config.get("sentry_traces_rate", 0.0), environment=os.getenv("FLASK_ENV", "production"), release=analytics_config.get("app_version"), ) app.logger.info("Sentry error monitoring initialized") except Exception as e: app.logger.warning(f"Failed to initialize Sentry: {e}") # Fail-fast on weak/missing secret in production # Skip validation in testing or debug mode is_testing = app.config.get("TESTING", False) # Check both config and environment variable for FLASK_ENV flask_env_config = app.config.get("FLASK_ENV") flask_env_env = os.getenv("FLASK_ENV", "production") flask_env = flask_env_config if flask_env_config else flask_env_env is_production_env = flask_env == "production" and not is_testing if not app.debug and is_production_env: secret = app.config.get("SECRET_KEY") placeholder_values = { "dev-secret-key-change-in-production", "your-secret-key-change-this", "your-secret-key-here", } if (not secret) or (secret in placeholder_values) or (isinstance(secret, str) and len(secret) < 32): app.logger.error("Invalid SECRET_KEY configured in production; refusing to start") raise RuntimeError("Invalid SECRET_KEY in production") # Check for debug mode in production - this is a security risk flask_debug = app.config.get("FLASK_DEBUG", False) if flask_debug or app.debug: app.logger.error("Debug mode is enabled in production; refusing to start") app.logger.error("Debug mode can leak sensitive information and should never be enabled in production") raise RuntimeError("Debug mode cannot be enabled in production") # Apply security headers and a basic CSP @app.after_request def apply_security_headers(response): try: headers = app.config.get("SECURITY_HEADERS", {}) or {} for k, v in headers.items(): # do not overwrite existing header if already present if not response.headers.get(k): response.headers[k] = v # Minimal CSP allowing our own resources and common CDNs used in templates if not response.headers.get("Content-Security-Policy"): csp = ( "default-src 'self'; " "img-src 'self' data: https:; " "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com https://fonts.googleapis.com https://cdn.datatables.net https://uicdn.toast.com; " "font-src 'self' https://fonts.gstatic.com https://cdnjs.cloudflare.com data:; " "script-src 'self' 'unsafe-inline' https://code.jquery.com https://cdn.datatables.net https://cdnjs.cloudflare.com https://cdn.jsdelivr.net https://esm.sh https://uicdn.toast.com; " "connect-src 'self' ws: wss: https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " "frame-ancestors 'none'" ) response.headers["Content-Security-Policy"] = csp # Additional privacy headers if not response.headers.get("Referrer-Policy"): response.headers["Referrer-Policy"] = "no-referrer" if not response.headers.get("Permissions-Policy"): response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()" except Exception: pass # CSRF cookie/token handling # If CSRF is enabled, ensure CSRF cookie exists for HTML GET responses # If CSRF is disabled, explicitly clear any existing CSRF cookie to avoid confusion if app.config.get("WTF_CSRF_ENABLED"): try: # Only for safe, HTML page responses if request.method == "GET": content_type = response.headers.get("Content-Type", "") if isinstance(content_type, str) and content_type.startswith("text/html"): cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN") has_cookie = bool(request.cookies.get(cookie_name)) if not has_cookie: # Generate a CSRF token and set cookie using same settings as /auth/csrf-token try: from flask_wtf.csrf import generate_csrf token = generate_csrf() except Exception: token = "" cookie_secure = bool( app.config.get( "CSRF_COOKIE_SECURE", app.config.get("SESSION_COOKIE_SECURE", False), ) ) cookie_httponly = bool(app.config.get("CSRF_COOKIE_HTTPONLY", False)) cookie_samesite = app.config.get("CSRF_COOKIE_SAMESITE", "Lax") cookie_domain = app.config.get("CSRF_COOKIE_DOMAIN") or None cookie_path = app.config.get("CSRF_COOKIE_PATH", "/") try: max_age = int(app.config.get("WTF_CSRF_TIME_LIMIT", 3600)) except Exception: max_age = 3600 response.set_cookie( cookie_name, token or "", max_age=max_age, secure=cookie_secure, httponly=cookie_httponly, samesite=cookie_samesite, domain=cookie_domain, path=cookie_path, ) except Exception: pass else: try: cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN") if request.cookies.get(cookie_name): # Clear the cookie by setting it expired response.set_cookie( cookie_name, "", max_age=0, expires=0, path=app.config.get("CSRF_COOKIE_PATH", "/"), domain=app.config.get("CSRF_COOKIE_DOMAIN") or None, secure=bool( app.config.get("CSRF_COOKIE_SECURE", app.config.get("SESSION_COOKIE_SECURE", False)) ), httponly=bool(app.config.get("CSRF_COOKIE_HTTPONLY", False)), samesite=app.config.get("CSRF_COOKIE_SAMESITE", "Lax"), ) except Exception: pass return response # CSRF error handler with HTML-friendly fallback @app.errorhandler(CSRFError) def handle_csrf_error(e): # Prefer HTML flow for classic form posts regardless of Accept header quirks try: mimetype, _ = parse_options_header(request.headers.get("Content-Type", "")) is_classic_form = mimetype in ("application/x-www-form-urlencoded", "multipart/form-data") except Exception: is_classic_form = False # Log details for diagnostics try: try: from flask_login import current_user as _cu user_id = getattr(_cu, "id", None) if getattr(_cu, "is_authenticated", False) else None except Exception: user_id = None app.logger.warning( "CSRF failure: path=%s method=%s form=%s json=%s ref=%s user=%s reason=%s", request.path, request.method, bool(request.form), request.is_json, request.referrer, user_id, getattr(e, "description", ""), ) except Exception: pass if request.method == "POST" and (is_classic_form or (request.form and not request.is_json)): try: flash(_("Your session expired or the page was open too long. Please try again."), "warning") except Exception: flash("Your session expired or the page was open too long. Please try again.", "warning") # Redirect back to a safe same-origin referrer if available, else to dashboard dest = url_for("main.dashboard") try: ref = request.referrer if ref: ref_host = urlparse(ref).netloc cur_host = urlparse(request.host_url).netloc if ref_host and ref_host == cur_host: dest = ref except Exception: pass return redirect(dest) # JSON/XHR fall-through try: wants_json = ( request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest" or request.accept_mimetypes["application/json"] >= request.accept_mimetypes["text/html"] ) except Exception: wants_json = False if wants_json: return jsonify(error="csrf_token_missing_or_invalid"), 400 # Default to HTML-friendly behavior try: flash(_("Your session expired or the page was open too long. Please try again."), "warning") except Exception: flash("Your session expired or the page was open too long. Please try again.", "warning") dest = url_for("main.dashboard") try: ref = request.referrer if ref: ref_host = urlparse(ref).netloc cur_host = urlparse(request.host_url).netloc if ref_host and ref_host == cur_host: dest = ref except Exception: pass return redirect(dest) # Expose csrf_token() in Jinja templates even without FlaskForm # Always inject the function, but return empty string when CSRF is disabled @app.context_processor def inject_csrf_token(): def get_csrf_token(): # Return empty string if CSRF is disabled if not app.config.get("WTF_CSRF_ENABLED"): return "" try: from flask_wtf.csrf import generate_csrf return generate_csrf() except Exception: return "" return dict(csrf_token=get_csrf_token) # CSRF token refresh endpoint (GET) @app.route("/auth/csrf-token", methods=["GET"]) def get_csrf_token(): # If CSRF is disabled, return empty token if not app.config.get("WTF_CSRF_ENABLED"): resp = jsonify(csrf_token="", csrf_enabled=False) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" return resp try: from flask_wtf.csrf import generate_csrf token = generate_csrf() except Exception: token = "" resp = jsonify(csrf_token=token, csrf_enabled=True) try: resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" except Exception: pass # Also set/update a CSRF cookie for double-submit pattern and SPA helpers try: cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN") # Derive defaults from session cookie flags if not explicitly set cookie_secure = bool( app.config.get( "CSRF_COOKIE_SECURE", app.config.get("SESSION_COOKIE_SECURE", False), ) ) cookie_httponly = bool(app.config.get("CSRF_COOKIE_HTTPONLY", False)) cookie_samesite = app.config.get("CSRF_COOKIE_SAMESITE", "Lax") cookie_domain = app.config.get("CSRF_COOKIE_DOMAIN") or None cookie_path = app.config.get("CSRF_COOKIE_PATH", "/") try: max_age = int(app.config.get("WTF_CSRF_TIME_LIMIT", 3600)) except Exception: max_age = 3600 resp.set_cookie( cookie_name, token or "", max_age=max_age, secure=cookie_secure, httponly=cookie_httponly, samesite=cookie_samesite, domain=cookie_domain, path=cookie_path, ) except Exception: pass return resp # Register blueprints (centralized in blueprint_registry) from app.blueprint_registry import register_all_blueprints register_all_blueprints(app, logger) # Register integration connectors try: from app.integrations import registry logger.info("Integration connectors registered") except Exception as e: logger.warning(f"Could not register integration connectors: {e}") # Exempt API blueprints from CSRF protection (requires api_bp, api_v1_bp, api_docs_bp) from app.routes.api import api_bp from app.routes.api_docs import api_docs_bp from app.routes.api_v1 import api_v1_bp # Only if CSRF is enabled (JSON API uses token authentication, not CSRF tokens) if app.config.get("WTF_CSRF_ENABLED"): csrf.exempt(api_bp) csrf.exempt(api_v1_bp) csrf.exempt(api_docs_bp) # Initialize OIDC IP cache from app.utils.oidc_metadata import initialize_ip_cache ip_cache_ttl = int(app.config.get("OIDC_IP_CACHE_TTL", 300)) initialize_ip_cache(ip_cache_ttl) # Register OAuth OIDC client if enabled try: auth_method = (app.config.get("AUTH_METHOD") or "local").strip().lower() except Exception: auth_method = "local" if auth_method in ("oidc", "both"): issuer = app.config.get("OIDC_ISSUER") client_id = app.config.get("OIDC_CLIENT_ID") client_secret = app.config.get("OIDC_CLIENT_SECRET") scopes = app.config.get("OIDC_SCOPES", "openid profile email") if issuer and client_id and client_secret: # Try to fetch metadata first using our utility with better DNS handling from app.utils.oidc_metadata import fetch_oidc_metadata # Get retry configuration from environment max_retries = int(app.config.get("OIDC_METADATA_RETRY_ATTEMPTS", 3)) retry_delay = int(app.config.get("OIDC_METADATA_RETRY_DELAY", 2)) timeout = int(app.config.get("OIDC_METADATA_FETCH_TIMEOUT", 10)) dns_strategy = app.config.get("OIDC_DNS_RESOLUTION_STRATEGY", "auto") use_ip_directly = app.config.get("OIDC_USE_IP_DIRECTLY", True) use_docker_internal = app.config.get("OIDC_USE_DOCKER_INTERNAL", True) metadata, metadata_error, diagnostics = fetch_oidc_metadata( issuer, max_retries=max_retries, retry_delay=retry_delay, timeout=timeout, use_dns_test=True, dns_strategy=dns_strategy, use_ip_directly=use_ip_directly, use_docker_internal=use_docker_internal, ) # Log diagnostics if available if diagnostics: app.logger.info( "OIDC metadata fetch diagnostics: DNS strategy=%s, IP=%s, attempts=%d", diagnostics.get("dns_resolution", {}).get("strategy", "unknown"), diagnostics.get("dns_resolution", {}).get("ip_address", "none"), len(diagnostics.get("strategies_tried", [])), ) if metadata: # Successfully fetched metadata - register with it try: oauth.register( name="oidc", client_id=client_id, client_secret=client_secret, server_metadata_url=f"{issuer.rstrip('/')}/.well-known/openid-configuration", client_kwargs={ "scope": scopes, "code_challenge_method": "S256", }, ) app.logger.info("OIDC client registered with issuer %s", issuer) except Exception as e: app.logger.error("Failed to register OIDC client after metadata fetch: %s", e) else: # Metadata fetch failed - try to register anyway (Authlib will attempt fetch) # If that also fails, we'll handle it gracefully and store config for lazy loading app.logger.warning( "Failed to fetch OIDC metadata at startup: %s. " "Attempting to register client anyway - Authlib will retry metadata fetch.", metadata_error, ) try: oauth.register( name="oidc", client_id=client_id, client_secret=client_secret, server_metadata_url=f"{issuer.rstrip('/')}/.well-known/openid-configuration", client_kwargs={ "scope": scopes, "code_challenge_method": "S256", }, ) app.logger.info( "OIDC client registered (Authlib will handle metadata fetch) for issuer %s", issuer, ) except Exception as e: error_msg = str(e) # Check if it's a DNS resolution error if ( "NameResolutionError" in error_msg or "Failed to resolve" in error_msg or "[Errno -2]" in error_msg ): # Store config for lazy loading in login route app.config["OIDC_ISSUER_FOR_LAZY_LOAD"] = issuer app.config["OIDC_CLIENT_ID_FOR_LAZY_LOAD"] = client_id app.config["OIDC_CLIENT_SECRET_FOR_LAZY_LOAD"] = client_secret app.config["OIDC_SCOPES_FOR_LAZY_LOAD"] = scopes issuer_host = urlparse(issuer).netloc.split(":")[0] if issuer else "unknown" app.logger.warning( "OIDC client registration failed due to DNS resolution error: %s. " "Client will be created lazily on first login attempt. " "Troubleshooting:\n" "1. Verify DNS resolution: docker exec -it python -c \"import socket; print(socket.gethostbyname('%s'))\"\n" "2. Configure DNS servers in Docker/Portainer stack (add 'dns: [8.8.8.8, 8.8.4.4]' to service)\n" "3. If both containers are on same Docker network, use container name instead of external domain\n" "4. See docs/TROUBLESHOOTING_OIDC_DNS.md for detailed solutions", error_msg, issuer_host, ) else: issuer_host = urlparse(issuer).netloc.split(":")[0] if issuer else "unknown" app.logger.error( "Failed to register OIDC client: %s\n" "Troubleshooting:\n" "1. Verify DNS resolution: docker exec -it python -c \"import socket; print(socket.gethostbyname('%s'))\"\n" "2. Configure DNS servers in Docker/Portainer stack (add 'dns: [8.8.8.8, 8.8.4.4]' to service)\n" "3. If both containers are on same Docker network, use container name instead of external domain\n" "4. See docs/TROUBLESHOOTING_OIDC_DNS.md for detailed solutions", error_msg, issuer_host, ) else: app.logger.warning( "AUTH_METHOD is %s but OIDC envs are incomplete; OIDC login will not work", auth_method, ) # Schedule background metadata refresh if enabled refresh_interval = int(app.config.get("OIDC_METADATA_REFRESH_INTERVAL", 3600)) if refresh_interval > 0 and issuer and client_id and client_secret: def refresh_oidc_metadata(): """Background task to refresh OIDC metadata""" try: from app.utils.oidc_metadata import fetch_oidc_metadata max_retries = int(app.config.get("OIDC_METADATA_RETRY_ATTEMPTS", 3)) retry_delay = int(app.config.get("OIDC_METADATA_RETRY_DELAY", 2)) timeout = int(app.config.get("OIDC_METADATA_FETCH_TIMEOUT", 10)) dns_strategy = app.config.get("OIDC_DNS_RESOLUTION_STRATEGY", "auto") use_ip_directly = app.config.get("OIDC_USE_IP_DIRECTLY", True) use_docker_internal = app.config.get("OIDC_USE_DOCKER_INTERNAL", True) app.logger.info("Background OIDC metadata refresh started for issuer %s", issuer) metadata, metadata_error, diagnostics = fetch_oidc_metadata( issuer, max_retries=max_retries, retry_delay=retry_delay, timeout=timeout, use_dns_test=True, dns_strategy=dns_strategy, use_ip_directly=use_ip_directly, use_docker_internal=use_docker_internal, ) if metadata: app.logger.info( "Background OIDC metadata refresh successful (issuer: %s, strategy: %s)", metadata.get("issuer"), ( diagnostics.get("dns_resolution", {}).get("strategy", "unknown") if diagnostics else "unknown" ), ) else: app.logger.warning( "Background OIDC metadata refresh failed: %s (existing connection will continue to work)", metadata_error, ) except Exception as e: app.logger.error("Error in background OIDC metadata refresh: %s", str(e)) # Schedule the refresh task try: scheduler.add_job( func=refresh_oidc_metadata, trigger="interval", seconds=refresh_interval, id="oidc_metadata_refresh", replace_existing=True, max_instances=1, ) app.logger.info("Scheduled OIDC metadata refresh every %d seconds", refresh_interval) except Exception as e: app.logger.warning("Failed to schedule OIDC metadata refresh: %s", str(e)) # Prometheus metrics endpoint @app.route("/metrics") def metrics(): """Expose Prometheus metrics""" return generate_latest(), 200, {"Content-Type": CONTENT_TYPE_LATEST} # Register error handlers from app.utils.error_handlers import register_error_handlers register_error_handlers(app) # Register context processors from app.utils.context_processors import register_context_processors register_context_processors(app) # Register i18n template filters from app.utils.i18n_helpers import register_i18n_filters register_i18n_filters(app) # (translations compiled and directories set before Babel init) # Register template filters from app.utils.template_filters import register_template_filters register_template_filters(app) # Initialize module registry and helpers from app.utils.module_helpers import init_module_helpers init_module_helpers(app) # Register CLI commands from app.utils.cli import register_cli_commands register_cli_commands(app) # Promote configured admin usernames automatically on each request (idempotent) @app.before_request def _promote_admin_users_on_request(): try: from flask_login import current_user if not current_user or not getattr(current_user, "is_authenticated", False): return admin_usernames = [u.strip().lower() for u in app.config.get("ADMIN_USERNAMES", ["admin"])] if ( current_user.username and current_user.username.lower() in admin_usernames and current_user.role != "admin" ): current_user.role = "admin" db.session.commit() except Exception: # Non-fatal; avoid breaking requests if this fails try: db.session.rollback() except Exception: pass # Initialize database on first request def initialize_database(): try: # Import models to ensure they are registered from app.models import Comment, Issue, Project, Settings, Task, TaskActivity, TimeEntry, User # Create database tables db.create_all() # Check and migrate Task Management tables if needed from app.utils.legacy_migrations import migrate_issues_table, migrate_task_management_tables migrate_task_management_tables() migrate_issues_table() # Create default admin user or demo user if it doesn't exist if app.config.get("DEMO_MODE"): demo_username = (app.config.get("DEMO_USERNAME") or "demo").strip().lower() if not User.query.filter_by(username=demo_username).first(): from app.models import Role demo_user = User(username=demo_username, role="admin") demo_user.is_active = True demo_user.set_password(app.config.get("DEMO_PASSWORD", "demo")) admin_role = Role.query.filter_by(name="admin").first() if admin_role: demo_user.roles.append(admin_role) db.session.add(demo_user) db.session.commit() print(f"Created demo user: {demo_username}") else: admin_username = app.config.get("ADMIN_USERNAMES", ["admin"])[0] if not User.query.filter_by(username=admin_username).first(): from app.models import Role admin_user = User(username=admin_username, role="admin") admin_user.is_active = True admin_role = Role.query.filter_by(name="admin").first() if admin_role: admin_user.roles.append(admin_role) db.session.add(admin_user) db.session.commit() print(f"Created default admin user: {admin_username}") print("Database initialized successfully") except Exception as e: print(f"Error initializing database: {e}") # Don't raise the exception, just log it # Store the initialization function for later use app.initialize_database = initialize_database return app def init_database(app): """Initialize database tables and create default admin user""" with app.app_context(): try: # Import models to ensure they are registered from app.models import Comment, Issue, Project, Settings, Task, TaskActivity, TimeEntry, User # Create database tables db.create_all() # Check and migrate Task Management tables if needed from app.utils.legacy_migrations import migrate_issues_table, migrate_task_management_tables migrate_task_management_tables() migrate_issues_table() # Create default admin user or demo user if it doesn't exist if app.config.get("DEMO_MODE"): demo_username = (app.config.get("DEMO_USERNAME") or "demo").strip().lower() if not User.query.filter_by(username=demo_username).first(): from app.models import Role demo_user = User(username=demo_username, role="admin") demo_user.is_active = True demo_user.set_password(app.config.get("DEMO_PASSWORD", "demo")) admin_role = Role.query.filter_by(name="admin").first() if admin_role: demo_user.roles.append(admin_role) db.session.add(demo_user) db.session.commit() print(f"Created demo user: {demo_username}") else: admin_username = app.config.get("ADMIN_USERNAMES", ["admin"])[0] if not User.query.filter_by(username=admin_username).first(): from app.models import Role admin_user = User(username=admin_username, role="admin") admin_user.is_active = True admin_role = Role.query.filter_by(name="admin").first() if admin_role: admin_user.roles.append(admin_role) db.session.add(admin_user) db.session.commit() print(f"Created default admin user: {admin_username}") print("Database initialized successfully") except Exception as e: print(f"Error initializing database: {e}") raise