mirror of
https://github.com/DRYTRIX/TimeTracker.git
synced 2026-01-06 03:30:25 -06:00
- Simplify docker-compose setup and align environment defaults - Update README and Quick Start to reflect the new compose flow - Refine app initialization and configuration for clearer env handling - Minor consistency and cleanup in config modules No breaking changes expected.
1023 lines
39 KiB
Python
1023 lines
39 KiB
Python
import os
|
|
import logging
|
|
import uuid
|
|
import time
|
|
from datetime import timedelta
|
|
from flask import Flask, request, session, redirect, url_for, flash, jsonify, g
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_migrate import Migrate
|
|
from flask_login import LoginManager
|
|
from flask_socketio import SocketIO
|
|
from dotenv import load_dotenv
|
|
from flask_babel import Babel, _
|
|
from flask_wtf.csrf import CSRFProtect, CSRFError
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from authlib.integrations.flask_client import OAuth
|
|
import re
|
|
from jinja2 import ChoiceLoader, FileSystemLoader
|
|
from urllib.parse import urlparse
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.http import parse_options_header
|
|
from pythonjsonlogger import jsonlogger
|
|
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
|
|
import sentry_sdk
|
|
from sentry_sdk.integrations.flask import FlaskIntegration
|
|
import posthog
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Initialize extensions
|
|
db = SQLAlchemy()
|
|
migrate = Migrate()
|
|
login_manager = LoginManager()
|
|
socketio = SocketIO()
|
|
babel = Babel()
|
|
csrf = CSRFProtect()
|
|
limiter = Limiter(key_func=get_remote_address, default_limits=[])
|
|
oauth = OAuth()
|
|
|
|
# Initialize Prometheus metrics
|
|
REQUEST_COUNT = Counter('tt_requests_total', 'Total requests', ['method', 'endpoint', 'http_status'])
|
|
REQUEST_LATENCY = Histogram('tt_request_latency_seconds', 'Request latency seconds', ['endpoint'])
|
|
|
|
# Initialize JSON logger for structured logging
|
|
json_logger = logging.getLogger("timetracker")
|
|
json_logger.setLevel(logging.INFO)
|
|
|
|
|
|
def log_event(name: str, **kwargs):
|
|
"""Log an event with structured JSON format including request context"""
|
|
try:
|
|
extra = {"request_id": getattr(g, "request_id", None), "event": name, **kwargs}
|
|
json_logger.info(name, extra=extra)
|
|
except Exception:
|
|
# Don't let logging errors break the application
|
|
pass
|
|
|
|
|
|
def identify_user(user_id, properties=None):
|
|
"""
|
|
Identify a user in PostHog with person properties.
|
|
|
|
Sets properties on the user for better segmentation, cohort analysis,
|
|
and personalization in PostHog.
|
|
|
|
Args:
|
|
user_id: The user ID (internal ID, not PII)
|
|
properties: Dict of properties to set (use $set and $set_once)
|
|
"""
|
|
try:
|
|
posthog_api_key = os.getenv("POSTHOG_API_KEY", "")
|
|
if not posthog_api_key:
|
|
return
|
|
|
|
posthog.identify(
|
|
distinct_id=str(user_id),
|
|
properties=properties or {}
|
|
)
|
|
except Exception:
|
|
# Don't let analytics errors break the application
|
|
pass
|
|
|
|
|
|
def track_event(user_id, event_name, properties=None):
|
|
"""
|
|
Track a product analytics event via PostHog.
|
|
|
|
Enhanced to include contextual properties like user agent, referrer,
|
|
and deployment info for better analysis.
|
|
|
|
Args:
|
|
user_id: The user ID (internal ID, not PII)
|
|
event_name: Name of the event (use resource.action format)
|
|
properties: Dict of event properties (no PII)
|
|
"""
|
|
try:
|
|
# Get PostHog API key - must be explicitly set to enable tracking
|
|
posthog_api_key = os.getenv("POSTHOG_API_KEY", "")
|
|
if not posthog_api_key:
|
|
return
|
|
|
|
# Enhance properties with context
|
|
enhanced_properties = properties or {}
|
|
|
|
# Add request context if available
|
|
try:
|
|
if request:
|
|
enhanced_properties.update({
|
|
"$current_url": request.url,
|
|
"$host": request.host,
|
|
"$pathname": request.path,
|
|
"$browser": request.user_agent.browser,
|
|
"$device_type": "mobile" if request.user_agent.platform in ["android", "iphone"] else "desktop",
|
|
"$os": request.user_agent.platform,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
# Add deployment context
|
|
# Get app version from analytics config
|
|
from app.config.analytics_defaults import get_analytics_config
|
|
analytics_config = get_analytics_config()
|
|
|
|
enhanced_properties.update({
|
|
"environment": os.getenv("FLASK_ENV", "production"),
|
|
"app_version": analytics_config.get("app_version"),
|
|
"deployment_method": "docker" if os.path.exists("/.dockerenv") else "native",
|
|
})
|
|
|
|
posthog.capture(
|
|
distinct_id=str(user_id),
|
|
event=event_name,
|
|
properties=enhanced_properties
|
|
)
|
|
except Exception:
|
|
# Don't let analytics errors break the application
|
|
pass
|
|
|
|
|
|
def create_app(config=None):
|
|
"""Application factory pattern"""
|
|
app = Flask(__name__)
|
|
|
|
# Make app aware of reverse proxy (scheme/host/port) for correct URL generation & cookies
|
|
# Trust a single proxy by default; adjust via env if needed
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
|
|
|
|
# Configuration
|
|
# Load env-specific config class
|
|
try:
|
|
env_name = os.getenv("FLASK_ENV", "production")
|
|
cfg_map = {
|
|
"development": "app.config.DevelopmentConfig",
|
|
"testing": "app.config.TestingConfig",
|
|
"production": "app.config.ProductionConfig",
|
|
}
|
|
app.config.from_object(cfg_map.get(env_name, "app.config.Config"))
|
|
except Exception:
|
|
app.config.from_object("app.config.Config")
|
|
if config:
|
|
app.config.update(config)
|
|
|
|
# Add top-level templates directory in addition to app/templates
|
|
extra_templates_path = os.path.abspath(
|
|
os.path.join(app.root_path, "..", "templates")
|
|
)
|
|
app.jinja_loader = ChoiceLoader(
|
|
[app.jinja_loader, FileSystemLoader(extra_templates_path)]
|
|
)
|
|
|
|
# Prefer Postgres if POSTGRES_* envs are present but URL points to SQLite
|
|
current_url = app.config.get("SQLALCHEMY_DATABASE_URI", "")
|
|
if (
|
|
not app.config.get("TESTING")
|
|
and isinstance(current_url, str)
|
|
and current_url.startswith("sqlite")
|
|
and (
|
|
os.getenv("POSTGRES_DB")
|
|
or os.getenv("POSTGRES_USER")
|
|
or os.getenv("POSTGRES_PASSWORD")
|
|
)
|
|
):
|
|
pg_user = os.getenv("POSTGRES_USER", "timetracker")
|
|
pg_pass = os.getenv("POSTGRES_PASSWORD", "timetracker")
|
|
pg_db = os.getenv("POSTGRES_DB", "timetracker")
|
|
pg_host = os.getenv("POSTGRES_HOST", "db")
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = (
|
|
f"postgresql+psycopg2://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}"
|
|
)
|
|
|
|
# Initialize extensions
|
|
db.init_app(app)
|
|
migrate.init_app(app, db)
|
|
login_manager.init_app(app)
|
|
socketio.init_app(app, cors_allowed_origins="*")
|
|
oauth.init_app(app)
|
|
|
|
# Only initialize CSRF protection if enabled
|
|
if app.config.get('WTF_CSRF_ENABLED'):
|
|
csrf.init_app(app)
|
|
try:
|
|
# Configure limiter defaults from config if provided
|
|
default_limits = []
|
|
raw = app.config.get("RATELIMIT_DEFAULT")
|
|
if raw:
|
|
# support semicolon or comma separated limits
|
|
parts = [
|
|
p.strip() for p in str(raw).replace(",", ";").split(";") if p.strip()
|
|
]
|
|
if parts:
|
|
default_limits = parts
|
|
limiter._default_limits = default_limits # set after init
|
|
limiter.init_app(app)
|
|
except Exception:
|
|
limiter.init_app(app)
|
|
|
|
# Ensure translations exist and configure absolute translation directories before Babel init
|
|
try:
|
|
translations_dirs = (
|
|
app.config.get("BABEL_TRANSLATION_DIRECTORIES") or "translations"
|
|
).split(",")
|
|
base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
abs_dirs = []
|
|
for d in translations_dirs:
|
|
d = d.strip()
|
|
if not d:
|
|
continue
|
|
abs_dirs.append(
|
|
d if os.path.isabs(d) else os.path.abspath(os.path.join(base_path, d))
|
|
)
|
|
if abs_dirs:
|
|
app.config["BABEL_TRANSLATION_DIRECTORIES"] = os.pathsep.join(abs_dirs)
|
|
# Best-effort compile with Babel CLI if available, else Python fallback
|
|
try:
|
|
import subprocess
|
|
|
|
subprocess.run(["pybabel", "compile", "-d", abs_dirs[0]], check=False)
|
|
except Exception:
|
|
pass
|
|
from app.utils.i18n import ensure_translations_compiled
|
|
|
|
for d in abs_dirs:
|
|
ensure_translations_compiled(d)
|
|
except Exception:
|
|
pass
|
|
|
|
# Internationalization: locale selector compatible with Flask-Babel v4+
|
|
def _select_locale():
|
|
try:
|
|
# 1) User preference from DB
|
|
from flask_login import current_user
|
|
|
|
if current_user and getattr(current_user, "is_authenticated", False):
|
|
pref = getattr(current_user, "preferred_language", None)
|
|
if pref:
|
|
return pref
|
|
# 2) Session override (set-language route)
|
|
if "preferred_language" in session:
|
|
return session.get("preferred_language")
|
|
# 3) Best match with Accept-Language
|
|
supported = list(app.config.get("LANGUAGES", {}).keys()) or ["en"]
|
|
return request.accept_languages.best_match(supported) or app.config.get(
|
|
"BABEL_DEFAULT_LOCALE", "en"
|
|
)
|
|
except Exception:
|
|
return app.config.get("BABEL_DEFAULT_LOCALE", "en")
|
|
|
|
babel.init_app(
|
|
app,
|
|
default_locale=app.config.get("BABEL_DEFAULT_LOCALE", "en"),
|
|
default_timezone=app.config.get("TZ", "Europe/Rome"),
|
|
locale_selector=_select_locale,
|
|
)
|
|
|
|
# Ensure gettext helpers available in Jinja
|
|
try:
|
|
from flask_babel import gettext as _gettext, ngettext as _ngettext
|
|
|
|
app.jinja_env.globals.update(_=_gettext, ngettext=_ngettext)
|
|
except Exception:
|
|
pass
|
|
|
|
# Log effective database URL (mask password)
|
|
db_url = app.config.get("SQLALCHEMY_DATABASE_URI", "")
|
|
try:
|
|
masked_db_url = re.sub(r"//([^:]+):[^@]+@", r"//\\1:***@", db_url)
|
|
except Exception:
|
|
masked_db_url = db_url
|
|
app.logger.info(f"Using database URL: {masked_db_url}")
|
|
|
|
# Configure login manager
|
|
login_manager.login_view = "auth.login"
|
|
login_manager.login_message = "Please log in to access this page."
|
|
login_manager.login_message_category = "info"
|
|
|
|
# Internationalization selector handled via babel.init_app(locale_selector=...)
|
|
|
|
# Register user loader
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
"""Load user for Flask-Login"""
|
|
from app.models import User
|
|
|
|
return User.query.get(int(user_id))
|
|
|
|
# Check if initial setup is required (skip for certain routes)
|
|
@app.before_request
|
|
def check_setup_required():
|
|
try:
|
|
# Skip setup check in testing mode
|
|
if app.config.get('TESTING'):
|
|
return
|
|
|
|
# Skip setup check for these routes
|
|
skip_routes = ['setup.initial_setup', 'static', 'auth.login', 'auth.logout', 'main.health_check', 'main.readiness_check']
|
|
if request.endpoint in skip_routes:
|
|
return
|
|
|
|
# Skip for assets and health checks
|
|
if request.path.startswith('/static/') or request.path.startswith('/_'):
|
|
return
|
|
|
|
# Check if setup is complete
|
|
from app.utils.installation import get_installation_config
|
|
installation_config = get_installation_config()
|
|
|
|
if not installation_config.is_setup_complete():
|
|
return redirect(url_for('setup.initial_setup'))
|
|
except Exception:
|
|
pass
|
|
|
|
# Attach request ID for tracing
|
|
@app.before_request
|
|
def attach_request_id():
|
|
try:
|
|
g.request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
|
|
except Exception:
|
|
pass
|
|
|
|
# Start timer for Prometheus metrics
|
|
@app.before_request
|
|
def prom_start_timer():
|
|
try:
|
|
g._start_time = time.time()
|
|
except Exception:
|
|
pass
|
|
|
|
# Request logging for /login to trace POSTs reaching the app
|
|
@app.before_request
|
|
def log_login_requests():
|
|
try:
|
|
if request.path == "/login":
|
|
app.logger.info(
|
|
"%s %s from %s UA=%s",
|
|
request.method,
|
|
request.path,
|
|
request.headers.get("X-Forwarded-For") or request.remote_addr,
|
|
request.headers.get("User-Agent"),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Record Prometheus metrics and log write operations
|
|
@app.after_request
|
|
def record_metrics_and_log(response):
|
|
try:
|
|
# Record Prometheus metrics
|
|
latency = time.time() - getattr(g, '_start_time', time.time())
|
|
endpoint = request.endpoint or "unknown"
|
|
REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)
|
|
REQUEST_COUNT.labels(
|
|
method=request.method,
|
|
endpoint=endpoint,
|
|
http_status=response.status_code
|
|
).inc()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
# Log write operations
|
|
if request.method in ("POST", "PUT", "PATCH", "DELETE"):
|
|
app.logger.info(
|
|
"%s %s -> %s from %s",
|
|
request.method,
|
|
request.path,
|
|
response.status_code,
|
|
request.headers.get("X-Forwarded-For") or request.remote_addr,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return response
|
|
|
|
# Configure session
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(
|
|
seconds=int(os.getenv("PERMANENT_SESSION_LIFETIME", 86400))
|
|
)
|
|
|
|
# Setup logging (including JSON logging)
|
|
setup_logging(app)
|
|
|
|
# Load analytics configuration (embedded at build time)
|
|
from app.config.analytics_defaults import get_analytics_config, has_analytics_configured
|
|
analytics_config = get_analytics_config()
|
|
|
|
# Log analytics status (for transparency)
|
|
if has_analytics_configured():
|
|
app.logger.info("TimeTracker with analytics configured (telemetry opt-in via admin dashboard)")
|
|
else:
|
|
app.logger.info("TimeTracker build without analytics configuration")
|
|
|
|
# Initialize Sentry for error monitoring
|
|
# Priority: Env var > Built-in default > Disabled
|
|
sentry_dsn = analytics_config.get("sentry_dsn", "")
|
|
if sentry_dsn:
|
|
try:
|
|
sentry_sdk.init(
|
|
dsn=sentry_dsn,
|
|
integrations=[FlaskIntegration()],
|
|
traces_sample_rate=analytics_config.get("sentry_traces_rate", 0.0),
|
|
environment=os.getenv("FLASK_ENV", "production"),
|
|
release=analytics_config.get("app_version")
|
|
)
|
|
app.logger.info("Sentry error monitoring initialized")
|
|
except Exception as e:
|
|
app.logger.warning(f"Failed to initialize Sentry: {e}")
|
|
|
|
# Initialize PostHog for product analytics
|
|
# Priority: Env var > Built-in default > Disabled
|
|
posthog_api_key = analytics_config.get("posthog_api_key", "")
|
|
posthog_host = analytics_config.get("posthog_host", "https://app.posthog.com")
|
|
if posthog_api_key:
|
|
try:
|
|
posthog.project_api_key = posthog_api_key
|
|
posthog.host = posthog_host
|
|
app.logger.info(f"PostHog product analytics initialized (host: {posthog_host})")
|
|
except Exception as e:
|
|
app.logger.warning(f"Failed to initialize PostHog: {e}")
|
|
|
|
# Fail-fast on weak/missing secret in production
|
|
if not app.debug and app.config.get("FLASK_ENV", "production") == "production":
|
|
secret = app.config.get("SECRET_KEY")
|
|
placeholder_values = {"dev-secret-key-change-in-production", "your-secret-key-change-this", "your-secret-key-here"}
|
|
if (not secret) or (secret in placeholder_values) or (isinstance(secret, str) and len(secret) < 32):
|
|
app.logger.error("Invalid SECRET_KEY configured in production; refusing to start")
|
|
raise RuntimeError("Invalid SECRET_KEY in production")
|
|
|
|
# Apply security headers and a basic CSP
|
|
@app.after_request
|
|
def apply_security_headers(response):
|
|
try:
|
|
headers = app.config.get("SECURITY_HEADERS", {}) or {}
|
|
for k, v in headers.items():
|
|
# do not overwrite existing header if already present
|
|
if not response.headers.get(k):
|
|
response.headers[k] = v
|
|
# Minimal CSP allowing our own resources and common CDNs used in templates
|
|
if not response.headers.get("Content-Security-Policy"):
|
|
csp = (
|
|
"default-src 'self'; "
|
|
"img-src 'self' data: https:; "
|
|
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com https://fonts.googleapis.com https://cdn.datatables.net; "
|
|
"font-src 'self' https://fonts.gstatic.com https://cdnjs.cloudflare.com data:; "
|
|
"script-src 'self' 'unsafe-inline' https://code.jquery.com https://cdn.datatables.net https://cdnjs.cloudflare.com https://cdn.jsdelivr.net https://esm.sh; "
|
|
"connect-src 'self' ws: wss:; "
|
|
"frame-ancestors 'none'"
|
|
)
|
|
response.headers["Content-Security-Policy"] = csp
|
|
# Additional privacy headers
|
|
if not response.headers.get("Referrer-Policy"):
|
|
response.headers["Referrer-Policy"] = "no-referrer"
|
|
if not response.headers.get("Permissions-Policy"):
|
|
response.headers["Permissions-Policy"] = (
|
|
"geolocation=(), microphone=(), camera=()"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# CSRF cookie/token handling
|
|
# If CSRF is enabled, ensure CSRF cookie exists for HTML GET responses
|
|
# If CSRF is disabled, explicitly clear any existing CSRF cookie to avoid confusion
|
|
if app.config.get('WTF_CSRF_ENABLED'):
|
|
try:
|
|
# Only for safe, HTML page responses
|
|
if request.method == "GET":
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if isinstance(content_type, str) and content_type.startswith("text/html"):
|
|
cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN")
|
|
has_cookie = bool(request.cookies.get(cookie_name))
|
|
if not has_cookie:
|
|
# Generate a CSRF token and set cookie using same settings as /auth/csrf-token
|
|
try:
|
|
from flask_wtf.csrf import generate_csrf
|
|
token = generate_csrf()
|
|
except Exception:
|
|
token = ""
|
|
cookie_secure = bool(
|
|
app.config.get(
|
|
"CSRF_COOKIE_SECURE",
|
|
app.config.get("SESSION_COOKIE_SECURE", False),
|
|
)
|
|
)
|
|
cookie_httponly = bool(app.config.get("CSRF_COOKIE_HTTPONLY", False))
|
|
cookie_samesite = app.config.get("CSRF_COOKIE_SAMESITE", "Lax")
|
|
cookie_domain = app.config.get("CSRF_COOKIE_DOMAIN") or None
|
|
cookie_path = app.config.get("CSRF_COOKIE_PATH", "/")
|
|
try:
|
|
max_age = int(app.config.get("WTF_CSRF_TIME_LIMIT", 3600))
|
|
except Exception:
|
|
max_age = 3600
|
|
response.set_cookie(
|
|
cookie_name,
|
|
token or "",
|
|
max_age=max_age,
|
|
secure=cookie_secure,
|
|
httponly=cookie_httponly,
|
|
samesite=cookie_samesite,
|
|
domain=cookie_domain,
|
|
path=cookie_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN")
|
|
if request.cookies.get(cookie_name):
|
|
# Clear the cookie by setting it expired
|
|
response.set_cookie(
|
|
cookie_name,
|
|
"",
|
|
max_age=0,
|
|
expires=0,
|
|
path=app.config.get("CSRF_COOKIE_PATH", "/"),
|
|
domain=app.config.get("CSRF_COOKIE_DOMAIN") or None,
|
|
secure=bool(app.config.get("CSRF_COOKIE_SECURE", app.config.get("SESSION_COOKIE_SECURE", False))),
|
|
httponly=bool(app.config.get("CSRF_COOKIE_HTTPONLY", False)),
|
|
samesite=app.config.get("CSRF_COOKIE_SAMESITE", "Lax"),
|
|
)
|
|
except Exception:
|
|
pass
|
|
return response
|
|
|
|
# CSRF error handler with HTML-friendly fallback
|
|
@app.errorhandler(CSRFError)
|
|
def handle_csrf_error(e):
|
|
# Prefer HTML flow for classic form posts regardless of Accept header quirks
|
|
try:
|
|
mimetype, _ = parse_options_header(request.headers.get("Content-Type", ""))
|
|
is_classic_form = mimetype in ("application/x-www-form-urlencoded", "multipart/form-data")
|
|
except Exception:
|
|
is_classic_form = False
|
|
|
|
# Log details for diagnostics
|
|
try:
|
|
try:
|
|
from flask_login import current_user as _cu
|
|
user_id = getattr(_cu, "id", None) if getattr(_cu, "is_authenticated", False) else None
|
|
except Exception:
|
|
user_id = None
|
|
app.logger.warning(
|
|
"CSRF failure: path=%s method=%s form=%s json=%s ref=%s user=%s reason=%s",
|
|
request.path,
|
|
request.method,
|
|
bool(request.form),
|
|
request.is_json,
|
|
request.referrer,
|
|
user_id,
|
|
getattr(e, "description", "")
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if request.method == "POST" and (is_classic_form or (request.form and not request.is_json)):
|
|
try:
|
|
flash(_("Your session expired or the page was open too long. Please try again."), "warning")
|
|
except Exception:
|
|
flash("Your session expired or the page was open too long. Please try again.", "warning")
|
|
|
|
# Redirect back to a safe same-origin referrer if available, else to dashboard
|
|
dest = url_for("main.dashboard")
|
|
try:
|
|
ref = request.referrer
|
|
if ref:
|
|
ref_host = urlparse(ref).netloc
|
|
cur_host = urlparse(request.host_url).netloc
|
|
if ref_host and ref_host == cur_host:
|
|
dest = ref
|
|
except Exception:
|
|
pass
|
|
return redirect(dest)
|
|
|
|
# JSON/XHR fall-through
|
|
try:
|
|
wants_json = (
|
|
request.is_json
|
|
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
or request.accept_mimetypes["application/json"]
|
|
>= request.accept_mimetypes["text/html"]
|
|
)
|
|
except Exception:
|
|
wants_json = False
|
|
|
|
if wants_json:
|
|
return jsonify(error="csrf_token_missing_or_invalid"), 400
|
|
|
|
# Default to HTML-friendly behavior
|
|
try:
|
|
flash(_("Your session expired or the page was open too long. Please try again."), "warning")
|
|
except Exception:
|
|
flash("Your session expired or the page was open too long. Please try again.", "warning")
|
|
dest = url_for("main.dashboard")
|
|
try:
|
|
ref = request.referrer
|
|
if ref:
|
|
ref_host = urlparse(ref).netloc
|
|
cur_host = urlparse(request.host_url).netloc
|
|
if ref_host and ref_host == cur_host:
|
|
dest = ref
|
|
except Exception:
|
|
pass
|
|
return redirect(dest)
|
|
|
|
# Expose csrf_token() in Jinja templates even without FlaskForm
|
|
# Always inject the function, but return empty string when CSRF is disabled
|
|
@app.context_processor
|
|
def inject_csrf_token():
|
|
def get_csrf_token():
|
|
# Return empty string if CSRF is disabled
|
|
if not app.config.get('WTF_CSRF_ENABLED'):
|
|
return ""
|
|
try:
|
|
from flask_wtf.csrf import generate_csrf
|
|
return generate_csrf()
|
|
except Exception:
|
|
return ""
|
|
return dict(csrf_token=get_csrf_token)
|
|
|
|
# CSRF token refresh endpoint (GET)
|
|
@app.route("/auth/csrf-token", methods=["GET"])
|
|
def get_csrf_token():
|
|
# If CSRF is disabled, return empty token
|
|
if not app.config.get('WTF_CSRF_ENABLED'):
|
|
resp = jsonify(csrf_token="", csrf_enabled=False)
|
|
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
|
return resp
|
|
|
|
try:
|
|
from flask_wtf.csrf import generate_csrf
|
|
|
|
token = generate_csrf()
|
|
except Exception:
|
|
token = ""
|
|
resp = jsonify(csrf_token=token, csrf_enabled=True)
|
|
try:
|
|
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
|
except Exception:
|
|
pass
|
|
# Also set/update a CSRF cookie for double-submit pattern and SPA helpers
|
|
try:
|
|
cookie_name = app.config.get("CSRF_COOKIE_NAME", "XSRF-TOKEN")
|
|
# Derive defaults from session cookie flags if not explicitly set
|
|
cookie_secure = bool(
|
|
app.config.get(
|
|
"CSRF_COOKIE_SECURE",
|
|
app.config.get("SESSION_COOKIE_SECURE", False),
|
|
)
|
|
)
|
|
cookie_httponly = bool(app.config.get("CSRF_COOKIE_HTTPONLY", False))
|
|
cookie_samesite = app.config.get("CSRF_COOKIE_SAMESITE", "Lax")
|
|
cookie_domain = app.config.get("CSRF_COOKIE_DOMAIN") or None
|
|
cookie_path = app.config.get("CSRF_COOKIE_PATH", "/")
|
|
try:
|
|
max_age = int(app.config.get("WTF_CSRF_TIME_LIMIT", 3600))
|
|
except Exception:
|
|
max_age = 3600
|
|
resp.set_cookie(
|
|
cookie_name,
|
|
token or "",
|
|
max_age=max_age,
|
|
secure=cookie_secure,
|
|
httponly=cookie_httponly,
|
|
samesite=cookie_samesite,
|
|
domain=cookie_domain,
|
|
path=cookie_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return resp
|
|
|
|
# Register blueprints
|
|
from app.routes.auth import auth_bp
|
|
from app.routes.main import main_bp
|
|
from app.routes.projects import projects_bp
|
|
from app.routes.timer import timer_bp
|
|
from app.routes.reports import reports_bp
|
|
from app.routes.admin import admin_bp
|
|
from app.routes.api import api_bp
|
|
from app.routes.analytics import analytics_bp
|
|
from app.routes.tasks import tasks_bp
|
|
from app.routes.invoices import invoices_bp
|
|
from app.routes.clients import clients_bp
|
|
from app.routes.comments import comments_bp
|
|
from app.routes.kanban import kanban_bp
|
|
from app.routes.setup import setup_bp
|
|
|
|
app.register_blueprint(auth_bp)
|
|
app.register_blueprint(main_bp)
|
|
app.register_blueprint(projects_bp)
|
|
app.register_blueprint(timer_bp)
|
|
app.register_blueprint(reports_bp)
|
|
app.register_blueprint(admin_bp)
|
|
app.register_blueprint(api_bp)
|
|
app.register_blueprint(analytics_bp)
|
|
app.register_blueprint(tasks_bp)
|
|
app.register_blueprint(invoices_bp)
|
|
app.register_blueprint(clients_bp)
|
|
app.register_blueprint(comments_bp)
|
|
app.register_blueprint(kanban_bp)
|
|
app.register_blueprint(setup_bp)
|
|
|
|
# Exempt API blueprint from CSRF protection (JSON API uses authentication, not CSRF tokens)
|
|
# Only if CSRF is enabled
|
|
if app.config.get('WTF_CSRF_ENABLED'):
|
|
csrf.exempt(api_bp)
|
|
|
|
# Register OAuth OIDC client if enabled
|
|
try:
|
|
auth_method = (app.config.get("AUTH_METHOD") or "local").strip().lower()
|
|
except Exception:
|
|
auth_method = "local"
|
|
|
|
if auth_method in ("oidc", "both"):
|
|
issuer = app.config.get("OIDC_ISSUER")
|
|
client_id = app.config.get("OIDC_CLIENT_ID")
|
|
client_secret = app.config.get("OIDC_CLIENT_SECRET")
|
|
scopes = app.config.get("OIDC_SCOPES", "openid profile email")
|
|
if issuer and client_id and client_secret:
|
|
try:
|
|
oauth.register(
|
|
name="oidc",
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
server_metadata_url=f"{issuer.rstrip('/')}/.well-known/openid-configuration",
|
|
client_kwargs={
|
|
"scope": scopes,
|
|
"code_challenge_method": "S256",
|
|
},
|
|
)
|
|
app.logger.info("OIDC client registered with issuer %s", issuer)
|
|
except Exception as e:
|
|
app.logger.error("Failed to register OIDC client: %s", e)
|
|
else:
|
|
app.logger.warning(
|
|
"AUTH_METHOD is %s but OIDC envs are incomplete; OIDC login will not work",
|
|
auth_method,
|
|
)
|
|
|
|
# Prometheus metrics endpoint
|
|
@app.route('/metrics')
|
|
def metrics():
|
|
"""Expose Prometheus metrics"""
|
|
return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}
|
|
|
|
# Register error handlers
|
|
from app.utils.error_handlers import register_error_handlers
|
|
|
|
register_error_handlers(app)
|
|
|
|
# Register context processors
|
|
from app.utils.context_processors import register_context_processors
|
|
|
|
register_context_processors(app)
|
|
|
|
# (translations compiled and directories set before Babel init)
|
|
|
|
# Register template filters
|
|
from app.utils.template_filters import register_template_filters
|
|
|
|
register_template_filters(app)
|
|
|
|
# Register CLI commands
|
|
from app.utils.cli import register_cli_commands
|
|
|
|
register_cli_commands(app)
|
|
|
|
# Promote configured admin usernames automatically on each request (idempotent)
|
|
@app.before_request
|
|
def _promote_admin_users_on_request():
|
|
try:
|
|
from flask_login import current_user
|
|
|
|
if not current_user or not getattr(current_user, "is_authenticated", False):
|
|
return
|
|
admin_usernames = [
|
|
u.strip().lower() for u in app.config.get("ADMIN_USERNAMES", ["admin"])
|
|
]
|
|
if (
|
|
current_user.username
|
|
and current_user.username.lower() in admin_usernames
|
|
and current_user.role != "admin"
|
|
):
|
|
current_user.role = "admin"
|
|
db.session.commit()
|
|
except Exception:
|
|
# Non-fatal; avoid breaking requests if this fails
|
|
try:
|
|
db.session.rollback()
|
|
except Exception:
|
|
pass
|
|
|
|
# Initialize database on first request
|
|
def initialize_database():
|
|
try:
|
|
# Import models to ensure they are registered
|
|
from app.models import (
|
|
User,
|
|
Project,
|
|
TimeEntry,
|
|
Task,
|
|
Settings,
|
|
TaskActivity,
|
|
Comment,
|
|
)
|
|
|
|
# Create database tables
|
|
db.create_all()
|
|
|
|
# Check and migrate Task Management tables if needed
|
|
migrate_task_management_tables()
|
|
|
|
# Create default admin user if it doesn't exist
|
|
admin_username = app.config.get("ADMIN_USERNAMES", ["admin"])[0]
|
|
if not User.query.filter_by(username=admin_username).first():
|
|
admin_user = User(username=admin_username, role="admin")
|
|
admin_user.is_active = True
|
|
db.session.add(admin_user)
|
|
db.session.commit()
|
|
print(f"Created default admin user: {admin_username}")
|
|
|
|
print("Database initialized successfully")
|
|
except Exception as e:
|
|
print(f"Error initializing database: {e}")
|
|
# Don't raise the exception, just log it
|
|
|
|
# Store the initialization function for later use
|
|
app.initialize_database = initialize_database
|
|
|
|
return app
|
|
|
|
|
|
def setup_logging(app):
|
|
"""Setup application logging including JSON logging"""
|
|
log_level = os.getenv("LOG_LEVEL", "INFO")
|
|
# Default to a file in the project logs directory if not provided
|
|
default_log_path = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)), "logs", "timetracker.log"
|
|
)
|
|
)
|
|
log_file = os.getenv("LOG_FILE", default_log_path)
|
|
|
|
# JSON log file path
|
|
json_log_path = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)), "logs", "app.jsonl"
|
|
)
|
|
)
|
|
|
|
# Prepare handlers
|
|
handlers = [logging.StreamHandler()]
|
|
|
|
# Add file handler (default or specified)
|
|
try:
|
|
# Ensure log directory exists
|
|
log_dir = os.path.dirname(log_file)
|
|
if log_dir and not os.path.exists(log_dir):
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Create file handler
|
|
file_handler = logging.FileHandler(log_file)
|
|
handlers.append(file_handler)
|
|
except (PermissionError, OSError) as e:
|
|
print(f"Warning: Could not create log file '{log_file}': {e}")
|
|
print("Logging to console only")
|
|
# Don't add file handler, just use console logging
|
|
|
|
# Configure Flask app logger directly (works well under gunicorn)
|
|
for handler in handlers:
|
|
handler.setLevel(getattr(logging, log_level.upper()))
|
|
handler.setFormatter(
|
|
logging.Formatter(
|
|
"%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]"
|
|
)
|
|
)
|
|
|
|
# Clear existing handlers to avoid duplicate logs
|
|
app.logger.handlers.clear()
|
|
app.logger.propagate = False
|
|
app.logger.setLevel(getattr(logging, log_level.upper()))
|
|
for handler in handlers:
|
|
app.logger.addHandler(handler)
|
|
|
|
# Also configure root logger so modules using logging.getLogger() are captured
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(getattr(logging, log_level.upper()))
|
|
# Avoid duplicating handlers if already attached
|
|
root_logger.handlers = []
|
|
for handler in handlers:
|
|
root_logger.addHandler(handler)
|
|
|
|
# Setup JSON logging for structured events
|
|
try:
|
|
json_log_dir = os.path.dirname(json_log_path)
|
|
if json_log_dir and not os.path.exists(json_log_dir):
|
|
os.makedirs(json_log_dir, exist_ok=True)
|
|
|
|
json_handler = logging.FileHandler(json_log_path)
|
|
json_formatter = jsonlogger.JsonFormatter(
|
|
'%(asctime)s %(levelname)s %(name)s %(message)s'
|
|
)
|
|
json_handler.setFormatter(json_formatter)
|
|
json_handler.setLevel(logging.INFO)
|
|
|
|
# Add JSON handler to the timetracker logger
|
|
json_logger.handlers.clear()
|
|
json_logger.addHandler(json_handler)
|
|
json_logger.propagate = False
|
|
|
|
app.logger.info(f"JSON logging initialized: {json_log_path}")
|
|
except (PermissionError, OSError) as e:
|
|
app.logger.warning(f"Could not initialize JSON logging: {e}")
|
|
|
|
# Suppress noisy logs in production
|
|
if not app.debug:
|
|
logging.getLogger("werkzeug").setLevel(logging.ERROR)
|
|
|
|
|
|
def migrate_task_management_tables():
|
|
"""Check and migrate Task Management tables if they don't exist"""
|
|
try:
|
|
from sqlalchemy import inspect, text
|
|
|
|
# Check if tasks table exists
|
|
inspector = inspect(db.engine)
|
|
existing_tables = inspector.get_table_names()
|
|
|
|
if "tasks" not in existing_tables:
|
|
print("Task Management: Creating tasks table...")
|
|
# Create the tasks table
|
|
db.create_all()
|
|
print("✓ Tasks table created successfully")
|
|
else:
|
|
print("Task Management: Tasks table already exists")
|
|
|
|
# Check if task_id column exists in time_entries table
|
|
if "time_entries" in existing_tables:
|
|
time_entries_columns = [
|
|
col["name"] for col in inspector.get_columns("time_entries")
|
|
]
|
|
if "task_id" not in time_entries_columns:
|
|
print("Task Management: Adding task_id column to time_entries table...")
|
|
try:
|
|
# Add task_id column to time_entries table
|
|
db.engine.execute(
|
|
text(
|
|
"ALTER TABLE time_entries ADD COLUMN task_id INTEGER REFERENCES tasks(id)"
|
|
)
|
|
)
|
|
print("✓ task_id column added to time_entries table")
|
|
except Exception as e:
|
|
print(f"⚠ Warning: Could not add task_id column: {e}")
|
|
print(
|
|
" You may need to manually add this column or recreate the database"
|
|
)
|
|
else:
|
|
print(
|
|
"Task Management: task_id column already exists in time_entries table"
|
|
)
|
|
|
|
print("Task Management migration check completed")
|
|
|
|
except Exception as e:
|
|
print(f"⚠ Warning: Task Management migration check failed: {e}")
|
|
print(
|
|
" The application will continue, but Task Management features may not work properly"
|
|
)
|
|
|
|
|
|
def init_database(app):
|
|
"""Initialize database tables and create default admin user"""
|
|
with app.app_context():
|
|
try:
|
|
# Import models to ensure they are registered
|
|
from app.models import (
|
|
User,
|
|
Project,
|
|
TimeEntry,
|
|
Task,
|
|
Settings,
|
|
TaskActivity,
|
|
Comment,
|
|
)
|
|
|
|
# Create database tables
|
|
db.create_all()
|
|
|
|
# Check and migrate Task Management tables if needed
|
|
migrate_task_management_tables()
|
|
|
|
# Create default admin user if it doesn't exist
|
|
admin_username = app.config.get("ADMIN_USERNAMES", ["admin"])[0]
|
|
if not User.query.filter_by(username=admin_username).first():
|
|
admin_user = User(username=admin_username, role="admin")
|
|
admin_user.is_active = True
|
|
db.session.add(admin_user)
|
|
db.session.commit()
|
|
print(f"Created default admin user: {admin_username}")
|
|
|
|
print("Database initialized successfully")
|
|
except Exception as e:
|
|
print(f"Error initializing database: {e}")
|
|
raise
|