Files
Warracker/backend/auth_utils.py
2025-09-30 20:58:34 +08:00

139 lines
5.1 KiB
Python

# backend/auth_utils.py
import jwt
from datetime import datetime, UTC, timedelta
from flask import current_app, request, jsonify
from functools import wraps
import re
# IMPORTANT: We need to import db_handler here for the decorators
try:
from . import db_handler
except ImportError:
import db_handler
def generate_token(user_id):
"""Generate a JWT token for the user"""
payload = {
'exp': datetime.now(UTC) + current_app.config['JWT_EXPIRATION_DELTA'],
'iat': datetime.now(UTC),
'sub': str(user_id)
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
def decode_token(token):
"""Decode a JWT token and return the user_id"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
return None # Token has expired
except jwt.InvalidTokenError:
return None # Invalid token
def token_required(f):
"""Decorator to protect routes that require authentication"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from Authorization header
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
# If no token in header, check form data for POST requests
if not token and request.method == 'POST':
token = request.form.get('auth_token') # Check form data
# If still no token, check URL query parameters
if not token:
token = request.args.get('token') # Check query parameters
# If no token is provided
if not token:
current_app.logger.warning(f"Authentication attempt without token: {request.path}")
return jsonify({'message': 'Authentication token is missing!'}), 401
# Decode the token
user_id = decode_token(token)
if not user_id:
current_app.logger.warning(f"Invalid token used for: {request.path}")
return jsonify({'message': 'Invalid or expired token!'}), 401
# Check if user exists
conn = None
try:
conn = db_handler.get_db_connection()
with conn.cursor() as cur:
# Try to get user with is_owner column, fall back to without it if column doesn't exist
try:
cur.execute('SELECT id, username, email, is_admin, is_owner, oidc_sub FROM users WHERE id = %s AND is_active = TRUE', (user_id,))
user = cur.fetchone()
has_owner_column = True
except Exception as e:
# If the query fails (likely because is_owner column doesn't exist), rollback and try again
current_app.logger.warning(f"Failed to query with is_owner column, falling back: {e}")
conn.rollback() # Rollback the failed transaction
cur.execute('SELECT id, username, email, is_admin FROM users WHERE id = %s AND is_active = TRUE', (user_id,))
user = cur.fetchone()
has_owner_column = False
if not user:
return jsonify({'message': 'User not found or inactive!'}), 401
# Add user info to request context
request.user = {
'id': user[0],
'username': user[1],
'email': user[2],
'is_admin': user[3],
}
if has_owner_column:
request.user.update({
'is_owner': user[4],
'oidc_managed': user[5] is not None
})
return f(*args, **kwargs)
except Exception as e:
current_app.logger.error(f"Authentication error: {e}")
return jsonify({'message': 'Authentication error!'}), 500
finally:
if conn:
db_handler.release_db_connection(conn)
return decorated
def admin_required(f):
"""Decorator to protect routes that require admin privileges"""
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if not request.user.get('is_admin', False):
current_app.logger.error(f"User {request.user.get('username')} is not an admin")
return jsonify({'message': 'Admin privileges required!'}), 403
return f(*args, **kwargs)
return decorated
def is_valid_email(email):
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_valid_password(password):
"""Validate password strength"""
# At least 8 characters, 1 uppercase, 1 lowercase, 1 number
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False
if not re.search(r'[a-z]', password):
return False
if not re.search(r'[0-9]', password):
return False
return True