Files
Warracker/backend/app.py
sassanix f6e11a629e Authentication, Settings Page
Refer to Changelog for all the changes
2025-03-13 15:48:42 -03:00

1926 lines
74 KiB
Python

from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for
import psycopg2
from psycopg2 import pool
import os
from datetime import datetime, timedelta, date
from werkzeug.utils import secure_filename
from flask_cors import CORS
import logging
import time
from decimal import Decimal
import jwt
from flask_bcrypt import Bcrypt
import re
from functools import wraps
import uuid
app = Flask(__name__)
CORS(app, supports_credentials=True) # Enable CORS with credentials
bcrypt = Bcrypt(app)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set a secret key for session and JWT
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key_change_in_production')
app.config['JWT_EXPIRATION_DELTA'] = timedelta(days=7) # Token expiration time
UPLOAD_FOLDER = '/data/uploads'
ALLOWED_EXTENSIONS = {'pdf', 'png', 'jpg', 'jpeg'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max upload
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
# PostgreSQL connection pool
DB_HOST = os.environ.get('DB_HOST', 'warrackerdb')
DB_NAME = os.environ.get('DB_NAME', 'warranty_db')
DB_USER = os.environ.get('DB_USER', 'warranty_user')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'warranty_password')
DB_ADMIN_USER = os.environ.get('DB_ADMIN_USER', 'warracker_admin')
DB_ADMIN_PASSWORD = os.environ.get('DB_ADMIN_PASSWORD', 'change_this_password_in_production')
# Add connection retry logic
def create_db_pool(max_retries=5, retry_delay=5):
attempt = 0
last_exception = None
while attempt < max_retries:
try:
logger.info(f"Attempting to connect to database (attempt {attempt+1}/{max_retries})")
connection_pool = pool.SimpleConnectionPool(
1, 10, # min, max connections
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
logger.info("Database connection successful")
return connection_pool
except Exception as e:
last_exception = e
logger.error(f"Database connection error: {e}")
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
attempt += 1
# If we got here, all connection attempts failed
logger.error(f"Failed to connect to database after {max_retries} attempts")
raise last_exception
# Create a connection pool with retry logic
try:
connection_pool = create_db_pool()
except Exception as e:
logger.error(f"Fatal database connection error: {e}")
# Allow the app to start even if DB connection fails
# This lets us serve static files while DB is unavailable
connection_pool = None
def get_db_connection():
try:
if connection_pool is None:
raise Exception("Database connection pool not initialized")
return connection_pool.getconn()
except Exception as e:
logger.error(f"Database connection error: {e}")
raise
def get_admin_db_connection():
"""Get a database connection with admin privileges for user management"""
try:
# Connect using the admin role for administrative tasks
conn = psycopg2.connect(
host=DB_HOST,
database=DB_NAME,
user=DB_ADMIN_USER,
password=DB_ADMIN_PASSWORD
)
return conn
except Exception as e:
logger.error(f"Admin database connection error: {e}")
raise
def release_db_connection(conn):
connection_pool.putconn(conn)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Initialize database
def init_db():
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Create warranties table if it doesn't exist
cur.execute('''
CREATE TABLE IF NOT EXISTS warranties (
id SERIAL PRIMARY KEY,
product_name TEXT NOT NULL,
purchase_date DATE NOT NULL,
warranty_years INTEGER NOT NULL,
expiration_date DATE,
invoice_path TEXT,
manual_path TEXT,
product_url TEXT,
purchase_price DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Add user_id column if it doesn't exist - use DO block to avoid errors
cur.execute('''
DO $$
BEGIN
IF NOT EXISTS (
SELECT column_name
FROM information_schema.columns
WHERE table_name='warranties' AND column_name='user_id'
) THEN
ALTER TABLE warranties ADD COLUMN user_id INTEGER;
END IF;
END $$;
''')
# Add indexes for faster queries
cur.execute('CREATE INDEX IF NOT EXISTS idx_expiration_date ON warranties(expiration_date)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_product_name ON warranties(product_name)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_user_id ON warranties(user_id)')
# Create serial numbers table
cur.execute('''
CREATE TABLE IF NOT EXISTS serial_numbers (
id SERIAL PRIMARY KEY,
warranty_id INTEGER NOT NULL,
serial_number VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (warranty_id) REFERENCES warranties(id) ON DELETE CASCADE
)
''')
# Add indexes for serial numbers
cur.execute('CREATE INDEX IF NOT EXISTS idx_warranty_id ON serial_numbers(warranty_id)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_serial_number ON serial_numbers(serial_number)')
# Create users table
cur.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(255),
last_name VARCHAR(255),
is_active BOOLEAN DEFAULT TRUE,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP
)
''')
# Add indexes for users
cur.execute('CREATE INDEX IF NOT EXISTS idx_username ON users(username)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_email ON users(email)')
# Create password reset tokens table
cur.execute('''
CREATE TABLE IF NOT EXISTS password_reset_tokens (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
token VARCHAR(255) NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# Add indexes for password reset tokens
cur.execute('CREATE INDEX IF NOT EXISTS idx_token ON password_reset_tokens(token)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_user_id_token ON password_reset_tokens(user_id)')
# Create user sessions table
cur.execute('''
CREATE TABLE IF NOT EXISTS user_sessions (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
session_token VARCHAR(255) NOT NULL,
expires_at TIMESTAMP NOT NULL,
ip_address VARCHAR(45),
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# Add indexes for user sessions
cur.execute('CREATE INDEX IF NOT EXISTS idx_session_token ON user_sessions(session_token)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_user_id_session ON user_sessions(user_id)')
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization error: {e}")
if conn:
conn.rollback()
raise
finally:
if conn:
release_db_connection(conn)
# Authentication helper functions
def generate_token(user_id):
"""Generate a JWT token for the user"""
payload = {
'exp': datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA'],
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def decode_token(token):
"""Decode a JWT token and return the user_id"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
return None # Token has expired
except jwt.InvalidTokenError:
return None # Invalid token
def token_required(f):
"""Decorator to protect routes that require authentication"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from Authorization header
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
# If no token is provided
if not token:
return jsonify({'message': 'Authentication token is missing!'}), 401
# Decode the token
user_id = decode_token(token)
if not user_id:
return jsonify({'message': 'Invalid or expired token!'}), 401
# Check if user exists
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute('SELECT id, username, email, is_admin FROM users WHERE id = %s AND is_active = TRUE', (user_id,))
user = cur.fetchone()
if not user:
return jsonify({'message': 'User not found or inactive!'}), 401
# Add user info to request context
request.user = {
'id': user[0],
'username': user[1],
'email': user[2],
'is_admin': user[3]
}
return f(*args, **kwargs)
except Exception as e:
logger.error(f"Authentication error: {e}")
return jsonify({'message': 'Authentication error!'}), 500
finally:
if conn:
release_db_connection(conn)
return decorated
def admin_required(f):
"""Decorator to protect routes that require admin privileges"""
@wraps(f)
def decorated(*args, **kwargs):
logger.info("Admin required check started")
token_required_result = token_required(lambda: None)()
if isinstance(token_required_result, tuple) and token_required_result[1] != 200:
logger.error(f"Token validation failed: {token_required_result}")
return token_required_result
logger.info(f"User info: {request.user}")
if not request.user.get('is_admin', False):
logger.error(f"User {request.user.get('username')} is not an admin")
return jsonify({'message': 'Admin privileges required!'}), 403
logger.info(f"Admin check passed for user {request.user.get('username')}")
return f(*args, **kwargs)
return decorated
def is_valid_email(email):
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_valid_password(password):
"""Validate password strength"""
# At least 8 characters, 1 uppercase, 1 lowercase, 1 number
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False
if not re.search(r'[a-z]', password):
return False
if not re.search(r'[0-9]', password):
return False
return True
# Authentication routes
@app.route('/api/auth/register', methods=['POST'])
def register():
conn = None
try:
data = request.get_json()
# Check if registration is enabled
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
registration_enabled = True
if table_exists:
# Get registration_enabled setting
cur.execute("SELECT value FROM site_settings WHERE key = 'registration_enabled'")
result = cur.fetchone()
if result:
registration_enabled = result[0].lower() == 'true'
# Check if there are any users (first user can register regardless of setting)
cur.execute('SELECT COUNT(*) FROM users')
user_count = cur.fetchone()[0]
# If registration is disabled and this is not the first user, return error
if not registration_enabled and user_count > 0:
return jsonify({'message': 'Registration is currently disabled!'}), 403
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'message': f'{field} is required!'}), 400
username = data['username']
email = data['email']
password = data['password']
first_name = data.get('first_name', '')
last_name = data.get('last_name', '')
# Validate email format
if not is_valid_email(email):
return jsonify({'message': 'Invalid email format!'}), 400
# Validate password strength
if not is_valid_password(password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
# Hash the password
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
with conn.cursor() as cur:
# Check if username or email already exists
cur.execute('SELECT id FROM users WHERE username = %s OR email = %s', (username, email))
existing_user = cur.fetchone()
if existing_user:
return jsonify({'message': 'Username or email already exists!'}), 409
# Check if this is the first user (who will be an admin)
cur.execute('SELECT COUNT(*) FROM users')
user_count = cur.fetchone()[0]
is_admin = user_count == 0
# Insert new user
cur.execute(
'INSERT INTO users (username, email, password_hash, first_name, last_name, is_admin) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id',
(username, email, password_hash, first_name, last_name, is_admin)
)
user_id = cur.fetchone()[0]
# Generate token
token = generate_token(user_id)
# Update last login
cur.execute('UPDATE users SET last_login = %s WHERE id = %s', (datetime.utcnow(), user_id))
conn.commit()
return jsonify({
'message': 'User registered successfully!',
'token': token,
'user': {
'id': user_id,
'username': username,
'email': email,
'is_admin': is_admin
}
}), 201
except Exception as e:
logger.error(f"Registration error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Registration failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/login', methods=['POST'])
def login():
conn = None
try:
data = request.get_json()
# Validate required fields
if not data.get('username') or not data.get('password'):
return jsonify({'message': 'Username and password are required!'}), 400
username = data['username']
password = data['password']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id, username, email, password_hash, is_active FROM users WHERE username = %s OR email = %s', (username, username))
user = cur.fetchone()
if not user or not bcrypt.check_password_hash(user[3], password):
return jsonify({'message': 'Invalid username or password!'}), 401
if not user[4]: # is_active
return jsonify({'message': 'Account is inactive!'}), 401
user_id = user[0]
# Generate token
token = generate_token(user_id)
# Update last login
cur.execute('UPDATE users SET last_login = %s WHERE id = %s', (datetime.utcnow(), user_id))
# Store session info
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent', '')
session_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA']
cur.execute(
'INSERT INTO user_sessions (user_id, session_token, expires_at, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s)',
(user_id, session_token, expires_at, ip_address, user_agent)
)
conn.commit()
# Check if remember_me is set
remember_me = data.get('remember_me', False)
# Create response
response_data = {
'message': 'Login successful!',
'token': token,
'user': {
'id': user_id,
'username': user[1],
'email': user[2]
}
}
if remember_me:
# Set longer expiration for persistent session
persistent_expires = datetime.utcnow() + timedelta(days=30)
# Update session expiration in database
cur.execute(
'UPDATE user_sessions SET expires_at = %s WHERE session_token = %s',
(persistent_expires, session_token)
)
conn.commit()
# Create response with cookie
response = jsonify(response_data)
# Set secure cookie with 30-day expiration
response.set_cookie(
'session_token',
session_token,
expires=persistent_expires,
httponly=True,
secure=request.is_secure,
samesite='Strict'
)
return response, 200
else:
# Return regular response without cookie
return jsonify(response_data), 200
except Exception as e:
logger.error(f"Login error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Login failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/logout', methods=['POST'])
@token_required
def logout():
conn = None
try:
# Get user ID from token
token = request.headers.get('Authorization').split(' ')[1]
user_id = decode_token(token)
# Invalidate session
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute('DELETE FROM user_sessions WHERE user_id = %s', (user_id,))
conn.commit()
return jsonify({'message': 'Logout successful!'}), 200
except Exception as e:
logger.error(f"Logout error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Logout failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/auto-login', methods=['POST'])
def auto_login():
conn = None
try:
# Get IP address and user agent
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent', '')
# Check for session cookie
session_cookie = request.cookies.get('session_token')
if not session_cookie:
return jsonify({'message': 'No session found'}), 401
conn = get_db_connection()
with conn.cursor() as cur:
# Find valid session
cur.execute(
'SELECT user_id FROM user_sessions WHERE session_token = %s AND expires_at > %s AND ip_address = %s',
(session_cookie, datetime.utcnow(), ip_address)
)
session = cur.fetchone()
if not session:
return jsonify({'message': 'Invalid or expired session'}), 401
user_id = session[0]
# Get user info
cur.execute('SELECT id, username, email, is_active FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user or not user[3]: # Check if user exists and is active
return jsonify({'message': 'User not found or inactive'}), 401
# Generate new token
token = generate_token(user_id)
# Update session expiration
new_expires_at = datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA']
cur.execute(
'UPDATE user_sessions SET expires_at = %s WHERE session_token = %s',
(new_expires_at, session_cookie)
)
# Update last login
cur.execute('UPDATE users SET last_login = %s WHERE id = %s', (datetime.utcnow(), user_id))
conn.commit()
# Set session cookie in response
response = jsonify({
'message': 'Auto-login successful!',
'token': token,
'user': {
'id': user[0],
'username': user[1],
'email': user[2]
}
})
# Set secure cookie with same expiration as token
response.set_cookie(
'session_token',
session_cookie,
expires=new_expires_at,
httponly=True,
secure=request.is_secure,
samesite='Strict'
)
return response, 200
except Exception as e:
logger.error(f"Auto-login error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Auto-login failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/validate-token', methods=['GET'])
@token_required
def validate_token():
"""Validate JWT token and return user info"""
try:
# If we got here, the token is valid (token_required decorator validated it)
return jsonify({
'valid': True,
'user': {
'id': request.user['id'],
'username': request.user['username'],
'email': request.user['email'],
'is_admin': request.user['is_admin']
},
'message': 'Token is valid'
}), 200
except Exception as e:
logger.error(f"Token validation error: {e}")
return jsonify({
'valid': False,
'message': 'Invalid token'
}), 401
@app.route('/api/auth/user', methods=['GET'])
@token_required
def get_user():
try:
user = request.user
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'is_admin': user['is_admin']
}), 200
except Exception as e:
logger.error(f"Get user error: {e}")
return jsonify({'message': 'Failed to retrieve user information!'}), 500
@app.route('/api/auth/password/reset-request', methods=['POST'])
def request_password_reset():
conn = None
try:
data = request.get_json()
if not data.get('email'):
return jsonify({'message': 'Email is required!'}), 400
email = data['email']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id FROM users WHERE email = %s', (email,))
user = cur.fetchone()
if not user:
# Don't reveal if email exists or not for security
return jsonify({'message': 'If your email is registered, you will receive a password reset link.'}), 200
user_id = user[0]
# Generate reset token
reset_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(hours=24)
# Delete any existing tokens for this user
cur.execute('DELETE FROM password_reset_tokens WHERE user_id = %s', (user_id,))
# Insert new token
cur.execute(
'INSERT INTO password_reset_tokens (user_id, token, expires_at) VALUES (%s, %s, %s)',
(user_id, reset_token, expires_at)
)
conn.commit()
# In a real application, you would send an email with the reset link
# For now, we'll just return the token in the response (for testing purposes)
reset_link = f"/reset-password?token={reset_token}"
logger.info(f"Password reset requested for user {user_id}. Reset link: {reset_link}")
return jsonify({
'message': 'If your email is registered, you will receive a password reset link.',
'reset_link': reset_link # Remove this in production
}), 200
except Exception as e:
logger.error(f"Password reset request error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password reset request failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/password/reset', methods=['POST'])
def reset_password():
conn = None
try:
data = request.get_json()
if not data.get('token') or not data.get('password'):
return jsonify({'message': 'Token and password are required!'}), 400
token = data['token']
password = data['password']
# Validate password strength
if not is_valid_password(password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
conn = get_db_connection()
with conn.cursor() as cur:
# Check if token exists and is valid
cur.execute('SELECT user_id, expires_at FROM password_reset_tokens WHERE token = %s', (token,))
token_info = cur.fetchone()
if not token_info or token_info[1] < datetime.utcnow():
return jsonify({'message': 'Invalid or expired token!'}), 400
user_id = token_info[0]
# Hash the new password
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
# Update user's password
cur.execute('UPDATE users SET password_hash = %s WHERE id = %s', (password_hash, user_id))
# Delete the used token
cur.execute('DELETE FROM password_reset_tokens WHERE token = %s', (token,))
conn.commit()
return jsonify({'message': 'Password reset successful!'}), 200
except Exception as e:
logger.error(f"Password reset error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password reset failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/password/change', methods=['POST'])
@token_required
def change_password():
conn = None
try:
data = request.get_json()
if not data.get('current_password') or not data.get('new_password'):
return jsonify({'message': 'Current password and new password are required!'}), 400
current_password = data['current_password']
new_password = data['new_password']
# Validate password strength
if not is_valid_password(new_password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
user_id = request.user['id']
conn = get_db_connection()
with conn.cursor() as cur:
# Check current password
cur.execute('SELECT password_hash FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user or not bcrypt.check_password_hash(user[0], current_password):
return jsonify({'message': 'Current password is incorrect!'}), 401
# Hash the new password
password_hash = bcrypt.generate_password_hash(new_password).decode('utf-8')
# Update user's password
cur.execute('UPDATE users SET password_hash = %s WHERE id = %s', (password_hash, user_id))
conn.commit()
return jsonify({'message': 'Password changed successfully!'}), 200
except Exception as e:
logger.error(f"Password change error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password change failed!'}), 500
finally:
if conn:
release_db_connection(conn)
# Update existing endpoints to use authentication
@app.route('/api/warranties', methods=['GET'])
@token_required
def get_warranties():
conn = None
try:
conn = get_db_connection()
user_id = request.user['id']
is_admin = request.user['is_admin']
with conn.cursor() as cur:
# If admin, can see all warranties, otherwise only user's warranties
if is_admin:
cur.execute('SELECT * FROM warranties ORDER BY expiration_date')
else:
cur.execute('SELECT * FROM warranties WHERE user_id = %s ORDER BY expiration_date', (user_id,))
warranties = cur.fetchall()
columns = [desc[0] for desc in cur.description]
warranties_list = []
for row in warranties:
warranty_dict = dict(zip(columns, row))
# Convert date objects to ISO format strings for JSON serialization
for key, value in warranty_dict.items():
if isinstance(value, (datetime, date)):
warranty_dict[key] = value.isoformat()
# Convert Decimal objects to float for JSON serialization
elif isinstance(value, Decimal):
warranty_dict[key] = float(value)
# Get serial numbers for this warranty
warranty_id = warranty_dict['id']
cur.execute('SELECT serial_number FROM serial_numbers WHERE warranty_id = %s', (warranty_id,))
serial_numbers = [row[0] for row in cur.fetchall()]
warranty_dict['serial_numbers'] = serial_numbers
warranties_list.append(warranty_dict)
return jsonify(warranties_list)
except Exception as e:
logger.error(f"Error retrieving warranties: {e}")
return jsonify({"error": "Failed to retrieve warranties"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties', methods=['POST'])
@token_required
def add_warranty():
conn = None
try:
# Validate input data
if not request.form.get('product_name'):
return jsonify({"error": "Product name is required"}), 400
if not request.form.get('purchase_date'):
return jsonify({"error": "Purchase date is required"}), 400
try:
warranty_years = int(request.form.get('warranty_years', '0'))
if warranty_years <= 0 or warranty_years > 100: # Set reasonable limits
return jsonify({"error": "Warranty years must be between 1 and 100"}), 400
except ValueError:
return jsonify({"error": "Warranty years must be a valid number"}), 400
# Process the data
product_name = request.form['product_name']
purchase_date_str = request.form['purchase_date']
serial_numbers = request.form.getlist('serial_numbers')
product_url = request.form.get('product_url', '')
user_id = request.user['id']
# Handle purchase price (optional)
purchase_price = None
if request.form.get('purchase_price'):
try:
purchase_price = float(request.form.get('purchase_price'))
if purchase_price < 0:
return jsonify({"error": "Purchase price cannot be negative"}), 400
except ValueError:
return jsonify({"error": "Purchase price must be a valid number"}), 400
try:
purchase_date = datetime.strptime(purchase_date_str, '%Y-%m-%d')
except ValueError:
return jsonify({"error": "Invalid date format. Use YYYY-MM-DD"}), 400
expiration_date = purchase_date + timedelta(days=warranty_years * 365)
# Handle invoice file upload
db_invoice_path = None
if 'invoice' in request.files:
invoice = request.files['invoice']
if invoice.filename != '':
if not allowed_file(invoice.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(invoice.filename)
# Make filename unique by adding timestamp
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{filename}"
invoice_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# Ensure directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
invoice.save(invoice_path)
db_invoice_path = os.path.join('uploads', filename)
# Handle manual file upload
db_manual_path = None
if 'manual' in request.files:
manual = request.files['manual']
if manual.filename != '':
if not allowed_file(manual.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(manual.filename)
# Make filename unique by adding timestamp
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_manual_{filename}"
manual_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# Ensure directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
manual.save(manual_path)
db_manual_path = os.path.join('uploads', filename)
# Save to database
conn = get_db_connection()
with conn.cursor() as cur:
# Insert warranty
cur.execute('''
INSERT INTO warranties (product_name, purchase_date, warranty_years, expiration_date, invoice_path, manual_path, product_url, purchase_price, user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
''', (product_name, purchase_date, warranty_years, expiration_date, db_invoice_path, db_manual_path, product_url, purchase_price, user_id))
warranty_id = cur.fetchone()[0]
# Insert serial numbers
if serial_numbers:
for serial_number in serial_numbers:
if serial_number.strip(): # Only insert non-empty serial numbers
cur.execute('''
INSERT INTO serial_numbers (warranty_id, serial_number)
VALUES (%s, %s)
''', (warranty_id, serial_number.strip()))
conn.commit()
return jsonify({
'message': 'Warranty added successfully',
'id': warranty_id
}), 201
except Exception as e:
logger.error(f"Error adding warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to add warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties/<int:warranty_id>', methods=['DELETE'])
@token_required
def delete_warranty(warranty_id):
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if warranty exists and belongs to the user
if is_admin:
cur.execute('SELECT id FROM warranties WHERE id = %s', (warranty_id,))
else:
cur.execute('SELECT id FROM warranties WHERE id = %s AND user_id = %s', (warranty_id, user_id))
warranty = cur.fetchone()
if not warranty:
return jsonify({"error": "Warranty not found or you don't have permission to delete it"}), 404
# First get the invoice path to delete the file
cur.execute('SELECT invoice_path, manual_path FROM warranties WHERE id = %s', (warranty_id,))
result = cur.fetchone()
invoice_path = result[0]
manual_path = result[1]
# Delete the warranty from database
cur.execute('DELETE FROM warranties WHERE id = %s', (warranty_id,))
deleted_rows = cur.rowcount
conn.commit()
# Delete the invoice file if it exists
if invoice_path:
full_path = os.path.join('/data', invoice_path)
if os.path.exists(full_path):
os.remove(full_path)
# Delete the manual file if it exists
if manual_path:
full_path = os.path.join('/data', manual_path)
if os.path.exists(full_path):
os.remove(full_path)
return jsonify({"message": "Warranty deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to delete warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties/<int:warranty_id>', methods=['PUT'])
@token_required
def update_warranty(warranty_id):
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if warranty exists and belongs to the user
if is_admin:
cur.execute('SELECT id FROM warranties WHERE id = %s', (warranty_id,))
else:
cur.execute('SELECT id FROM warranties WHERE id = %s AND user_id = %s', (warranty_id, user_id))
warranty = cur.fetchone()
if not warranty:
return jsonify({"error": "Warranty not found or you don't have permission to update it"}), 404
# Validate input data similar to the add_warranty route
if not request.form.get('product_name'):
return jsonify({"error": "Product name is required"}), 400
if not request.form.get('purchase_date'):
return jsonify({"error": "Purchase date is required"}), 400
try:
warranty_years = int(request.form.get('warranty_years', '0'))
if warranty_years <= 0 or warranty_years > 100:
return jsonify({"error": "Warranty years must be between 1 and 100"}), 400
except ValueError:
return jsonify({"error": "Warranty years must be a valid number"}), 400
# Process the data
product_name = request.form['product_name']
purchase_date_str = request.form['purchase_date']
serial_numbers = request.form.getlist('serial_numbers')
product_url = request.form.get('product_url', '')
# Handle purchase price (optional)
purchase_price = None
if request.form.get('purchase_price'):
try:
purchase_price = float(request.form.get('purchase_price'))
if purchase_price < 0:
return jsonify({"error": "Purchase price cannot be negative"}), 400
except ValueError:
return jsonify({"error": "Purchase price must be a valid number"}), 400
try:
purchase_date = datetime.strptime(purchase_date_str, '%Y-%m-%d')
except ValueError:
return jsonify({"error": "Invalid date format. Use YYYY-MM-DD"}), 400
expiration_date = purchase_date + timedelta(days=warranty_years * 365)
# Handle invoice file upload if new file is provided
db_invoice_path = None
if 'invoice' in request.files:
invoice = request.files['invoice']
if invoice.filename != '':
if not allowed_file(invoice.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(invoice.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{filename}"
invoice_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
invoice.save(invoice_path)
db_invoice_path = os.path.join('uploads', filename)
# Remove old invoice file if exists and different from new one
cur.execute('SELECT invoice_path FROM warranties WHERE id = %s', (warranty_id,))
old_invoice_path = cur.fetchone()[0]
if old_invoice_path and old_invoice_path != db_invoice_path:
old_full_path = os.path.join('/data', old_invoice_path)
if os.path.exists(old_full_path):
os.remove(old_full_path)
# Handle manual file upload if new file is provided
db_manual_path = None
if 'manual' in request.files:
manual = request.files['manual']
if manual.filename != '':
if not allowed_file(manual.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(manual.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_manual_{filename}"
manual_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
manual.save(manual_path)
db_manual_path = os.path.join('uploads', filename)
# Remove old manual file if exists and different from new one
cur.execute('SELECT manual_path FROM warranties WHERE id = %s', (warranty_id,))
old_manual_path = cur.fetchone()[0]
if old_manual_path and old_manual_path != db_manual_path:
old_full_path = os.path.join('/data', old_manual_path)
if os.path.exists(old_full_path):
os.remove(old_full_path)
# Update the warranty in database
cur.execute('''
UPDATE warranties
SET product_name = %s, purchase_date = %s, warranty_years = %s,
expiration_date = %s, invoice_path = %s, manual_path = %s, product_url = %s, purchase_price = %s
WHERE id = %s
''', (product_name, purchase_date, warranty_years, expiration_date,
db_invoice_path, db_manual_path, product_url, purchase_price, warranty_id))
# Update serial numbers
# First, delete existing serial numbers for this warranty
cur.execute('DELETE FROM serial_numbers WHERE warranty_id = %s', (warranty_id,))
# Then insert the new serial numbers
if serial_numbers:
for serial_number in serial_numbers:
if serial_number.strip(): # Only insert non-empty serial numbers
cur.execute('''
INSERT INTO serial_numbers (warranty_id, serial_number)
VALUES (%s, %s)
''', (warranty_id, serial_number.strip()))
conn.commit()
return jsonify({"message": "Warranty updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to update warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/statistics', methods=['GET'])
@token_required
def get_statistics():
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# Base query parts
from_clause = 'FROM warranties'
where_clause = ''
params = []
# Add user filtering if not admin
if not is_admin:
where_clause = 'WHERE user_id = %s'
params.append(user_id)
# Calculate expiration ranges
today = date.today()
logger.info(f"Current date: {today}")
thirty_days_later = today + timedelta(days=30)
ninety_days_later = today + timedelta(days=90)
# Get total count
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause}", params)
total_count = cur.fetchone()[0]
logger.info(f"Total warranties: {total_count}")
# Get active count
active_where = "WHERE" if not where_clause else "AND"
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s", params + [today])
active_count = cur.fetchone()[0]
logger.info(f"Active warranties: {active_count}")
# Get expired count
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date <= %s", params + [today])
expired_count = cur.fetchone()[0]
logger.info(f"Expired warranties: {expired_count}")
# Get expiring soon count (30 days)
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s AND expiration_date <= %s",
params + [today, thirty_days_later])
expiring_soon_count = cur.fetchone()[0]
logger.info(f"Expiring soon warranties: {expiring_soon_count}")
# Get expiration timeline (next 90 days, grouped by month)
cur.execute(f"""
SELECT
EXTRACT(YEAR FROM expiration_date) as year,
EXTRACT(MONTH FROM expiration_date) as month,
COUNT(*) as count
{from_clause}
{where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s AND expiration_date <= %s
GROUP BY EXTRACT(YEAR FROM expiration_date), EXTRACT(MONTH FROM expiration_date)
ORDER BY year, month
""", params + [today, ninety_days_later])
timeline = []
for row in cur.fetchall():
year = int(row[0])
month = int(row[1])
count = row[2]
timeline.append({
"year": year,
"month": month,
"count": count
})
# Get recent expiring warranties (30 days before and after today)
thirty_days_ago = today - timedelta(days=30)
cur.execute(f"""
SELECT
id, product_name, purchase_date, warranty_years,
expiration_date, invoice_path, manual_path, product_url, purchase_price
{from_clause}
{where_clause} {active_where if where_clause else 'WHERE'} expiration_date >= %s AND expiration_date <= %s
ORDER BY expiration_date
LIMIT 10
""", params + [thirty_days_ago, thirty_days_later])
columns = [desc[0] for desc in cur.description]
recent_warranties = []
for row in cur.fetchall():
warranty = dict(zip(columns, row))
# Convert dates to string format
if warranty['purchase_date']:
warranty['purchase_date'] = warranty['purchase_date'].isoformat()
if warranty['expiration_date']:
warranty['expiration_date'] = warranty['expiration_date'].isoformat()
# Convert Decimal objects to float for JSON serialization
if warranty.get('purchase_price') and isinstance(warranty['purchase_price'], Decimal):
warranty['purchase_price'] = float(warranty['purchase_price'])
recent_warranties.append(warranty)
statistics = {
'total': total_count,
'active': active_count,
'expired': expired_count,
'expiring_soon': expiring_soon_count,
'timeline': timeline,
'recent_warranties': recent_warranties
}
return jsonify(statistics)
except Exception as e:
logger.error(f"Error getting warranty statistics: {e}")
return jsonify({"error": str(e)}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/test', methods=['GET'])
def test_endpoint():
"""Simple test endpoint to check if the API is responding."""
return jsonify({
"status": "success",
"message": "API is working",
"timestamp": datetime.utcnow().isoformat()
})
# Public endpoint to check if authentication is required
@app.route('/api/auth/status', methods=['GET'])
def auth_status():
return jsonify({
"authentication_required": True,
"message": "Authentication is required for most endpoints"
})
@app.route('/api/auth/profile', methods=['PUT'])
@token_required
def update_profile():
user_id = request.user['id']
try:
# Get request data
data = request.get_json()
if not data:
return jsonify({'message': 'No input data provided'}), 400
# Extract fields
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
# Validate input
if not first_name or not last_name:
return jsonify({'message': 'First name and last name are required'}), 400
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Update user profile
cursor.execute(
"""
UPDATE users
SET first_name = %s, last_name = %s, updated_at = NOW()
WHERE id = %s
RETURNING id, username, email, first_name, last_name, created_at, updated_at
""",
(first_name, last_name, user_id)
)
# Get updated user data
user_data = cursor.fetchone()
if not user_data:
return jsonify({'message': 'User not found'}), 404
# Commit changes
conn.commit()
# Format user data
user = {
'id': user_data[0],
'username': user_data[1],
'email': user_data[2],
'first_name': user_data[3],
'last_name': user_data[4],
'created_at': user_data[5].isoformat() if user_data[5] else None,
'updated_at': user_data[6].isoformat() if user_data[6] else None
}
return jsonify(user), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in update_profile: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in update_profile: {str(e)}")
return jsonify({'message': 'An error occurred while updating profile'}), 500
@app.route('/api/auth/account', methods=['DELETE'])
@token_required
def delete_account():
user_id = request.user['id']
try:
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Begin transaction
cursor.execute("BEGIN")
# Delete user's warranties
cursor.execute("DELETE FROM warranties WHERE user_id = %s", (user_id,))
# Delete user's reset tokens if any
cursor.execute("DELETE FROM password_reset_tokens WHERE user_id = %s", (user_id,))
# Delete user
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
# Commit transaction
cursor.execute("COMMIT")
return jsonify({'message': 'Account deleted successfully'}), 200
except Exception as e:
cursor.execute("ROLLBACK")
logger.error(f"Database error in delete_account: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in delete_account: {str(e)}")
return jsonify({'message': 'An error occurred while deleting account'}), 500
@app.route('/api/auth/preferences', methods=['GET'])
@token_required
def get_preferences():
user_id = request.user['id']
try:
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Get user preferences
cursor.execute(
"""
SELECT email_notifications, default_view, theme
FROM user_preferences
WHERE user_id = %s
""",
(user_id,)
)
preferences_data = cursor.fetchone()
if not preferences_data:
# Create default preferences if not exists
cursor.execute(
"""
INSERT INTO user_preferences (user_id, email_notifications, default_view, theme)
VALUES (%s, TRUE, 'grid', 'light')
RETURNING email_notifications, default_view, theme
""",
(user_id,)
)
preferences_data = cursor.fetchone()
conn.commit()
# Format preferences data
preferences = {
'email_notifications': preferences_data[0],
'default_view': preferences_data[1],
'theme': preferences_data[2]
}
return jsonify(preferences), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in get_preferences: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in get_preferences: {str(e)}")
return jsonify({'message': 'An error occurred while getting preferences'}), 500
@app.route('/api/auth/preferences', methods=['PUT'])
@token_required
def update_preferences():
user_id = request.user['id']
try:
# Get request data
data = request.get_json()
if not data:
return jsonify({'message': 'No input data provided'}), 400
# Extract fields
email_notifications = data.get('email_notifications')
default_view = data.get('default_view')
theme = data.get('theme')
# Validate input
if default_view and default_view not in ['grid', 'list', 'table']:
return jsonify({'message': 'Invalid default view'}), 400
if theme and theme not in ['light', 'dark']:
return jsonify({'message': 'Invalid theme'}), 400
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Check if preferences exist
cursor.execute(
"SELECT 1 FROM user_preferences WHERE user_id = %s",
(user_id,)
)
preferences_exist = cursor.fetchone() is not None
if preferences_exist:
# Update existing preferences
update_fields = []
update_values = []
if email_notifications is not None:
update_fields.append("email_notifications = %s")
update_values.append(email_notifications)
if default_view:
update_fields.append("default_view = %s")
update_values.append(default_view)
if theme:
update_fields.append("theme = %s")
update_values.append(theme)
if update_fields:
update_query = f"""
UPDATE user_preferences
SET {', '.join(update_fields)}, updated_at = NOW()
WHERE user_id = %s
RETURNING email_notifications, default_view, theme
"""
cursor.execute(update_query, update_values + [user_id])
preferences_data = cursor.fetchone()
else:
# No fields to update
cursor.execute(
"""
SELECT email_notifications, default_view, theme
FROM user_preferences
WHERE user_id = %s
""",
(user_id,)
)
preferences_data = cursor.fetchone()
else:
# Create new preferences
cursor.execute(
"""
INSERT INTO user_preferences (
user_id,
email_notifications,
default_view,
theme
)
VALUES (%s, %s, %s, %s)
RETURNING email_notifications, default_view, theme
""",
(
user_id,
email_notifications if email_notifications is not None else True,
default_view or 'grid',
theme or 'light'
)
)
preferences_data = cursor.fetchone()
# Commit changes
conn.commit()
# Format preferences data
preferences = {
'email_notifications': preferences_data[0],
'default_view': preferences_data[1],
'theme': preferences_data[2]
}
return jsonify(preferences), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in update_preferences: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in update_preferences: {str(e)}")
return jsonify({'message': 'An error occurred while updating preferences'}), 500
# Admin User Management Endpoints
@app.route('/api/admin/users', methods=['GET'])
@admin_required
def get_all_users():
"""Get all users (admin only)"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute('''
SELECT id, username, email, first_name, last_name, is_active, is_admin, created_at, last_login
FROM users
ORDER BY created_at DESC
''')
users = cur.fetchall()
columns = [desc[0] for desc in cur.description]
users_list = []
for row in users:
user_dict = dict(zip(columns, row))
# Convert date objects to ISO format strings for JSON serialization
for key, value in user_dict.items():
if isinstance(value, (datetime, date)):
user_dict[key] = value.isoformat() if value else None
users_list.append(user_dict)
return jsonify(users_list), 200
except Exception as e:
logger.error(f"Error retrieving users: {e}")
return jsonify({"message": "Failed to retrieve users"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""Update user details (admin only)"""
conn = None
try:
# Prevent modifying self
if user_id == request.user['id']:
return jsonify({"message": "Cannot modify your own admin status"}), 403
data = request.get_json()
# Use regular connection since warranty_user now has superuser privileges
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user:
return jsonify({"message": "User not found"}), 404
# Update fields
updates = []
params = []
if 'is_admin' in data:
updates.append('is_admin = %s')
params.append(bool(data['is_admin']))
if 'is_active' in data:
updates.append('is_active = %s')
params.append(bool(data['is_active']))
if not updates:
return jsonify({"message": "No fields to update"}), 400
# Build and execute update query
query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
params.append(user_id)
cur.execute(query, params)
conn.commit()
return jsonify({"message": "User updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating user: {e}")
if conn:
conn.rollback()
return jsonify({"message": f"Failed to update user: {str(e)}"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""Delete a user (admin only)"""
conn = None
try:
logger.info(f"Delete user request received for user_id: {user_id}")
# Prevent deleting self
if user_id == request.user['id']:
logger.warning(f"User {request.user['username']} attempted to delete their own account")
return jsonify({"message": "Cannot delete your own account through admin API"}), 403
# Use regular connection since warranty_user now has superuser privileges
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id, username FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user:
logger.warning(f"User with ID {user_id} not found")
return jsonify({"message": "User not found"}), 404
logger.info(f"Deleting user {user[1]} (ID: {user[0]})")
# Delete user's warranties first
cur.execute('DELETE FROM warranties WHERE user_id = %s', (user_id,))
warranties_deleted = cur.rowcount
logger.info(f"Deleted {warranties_deleted} warranties belonging to user {user_id}")
# Delete user's password reset tokens if any
cur.execute('DELETE FROM password_reset_tokens WHERE user_id = %s', (user_id,))
tokens_deleted = cur.rowcount
logger.info(f"Deleted {tokens_deleted} password reset tokens belonging to user {user_id}")
# Delete user's sessions if any
cur.execute('DELETE FROM user_sessions WHERE user_id = %s', (user_id,))
sessions_deleted = cur.rowcount
logger.info(f"Deleted {sessions_deleted} sessions belonging to user {user_id}")
# Delete user
cur.execute('DELETE FROM users WHERE id = %s', (user_id,))
user_deleted = cur.rowcount
logger.info(f"Deleted user {user_id}, affected rows: {user_deleted}")
conn.commit()
logger.info(f"User {user_id} deleted successfully")
return jsonify({"message": "User deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting user: {e}")
if conn:
conn.rollback()
return jsonify({"message": f"Failed to delete user: {str(e)}"}), 500
finally:
if conn:
release_db_connection(conn)
# Site settings
@app.route('/api/admin/settings', methods=['GET'])
@admin_required
def get_site_settings():
"""Get site settings (admin only)"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
# Create settings table if it doesn't exist
if not table_exists:
cur.execute("""
CREATE TABLE site_settings (
key VARCHAR(255) PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# Get all settings
cur.execute('SELECT key, value FROM site_settings')
settings = {row[0]: row[1] for row in cur.fetchall()}
# Set default values if not present
if 'registration_enabled' not in settings:
settings['registration_enabled'] = 'true'
cur.execute(
'INSERT INTO site_settings (key, value) VALUES (%s, %s)',
('registration_enabled', 'true')
)
conn.commit()
return jsonify(settings), 200
except Exception as e:
logger.error(f"Error retrieving site settings: {e}")
return jsonify({"message": "Failed to retrieve site settings"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/settings', methods=['PUT'])
@admin_required
def update_site_settings():
"""Update site settings (admin only)"""
conn = None
try:
data = request.get_json()
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
# Create settings table if it doesn't exist
if not table_exists:
cur.execute("""
CREATE TABLE site_settings (
key VARCHAR(255) PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Update settings
for key, value in data.items():
# Convert boolean to string
if isinstance(value, bool):
value = str(value).lower()
cur.execute("""
INSERT INTO site_settings (key, value, updated_at)
VALUES (%s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (key)
DO UPDATE SET value = %s, updated_at = CURRENT_TIMESTAMP
""", (key, value, value))
conn.commit()
return jsonify({"message": "Settings updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating site settings: {e}")
if conn:
conn.rollback()
return jsonify({"message": "Failed to update site settings"}), 500
finally:
if conn:
release_db_connection(conn)
# Modify the register endpoint to check if registration is enabled
@app.route('/api/auth/registration-status', methods=['GET'])
def check_registration_status():
"""Check if registration is enabled"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
if not table_exists:
# If table doesn't exist, registration is enabled by default
return jsonify({"enabled": True}), 200
# Get registration_enabled setting
cur.execute("SELECT value FROM site_settings WHERE key = 'registration_enabled'")
result = cur.fetchone()
if not result:
# If setting doesn't exist, registration is enabled by default
return jsonify({"enabled": True}), 200
registration_enabled = result[0].lower() == 'true'
return jsonify({"enabled": registration_enabled}), 200
except Exception as e:
logger.error(f"Error checking registration status: {e}")
return jsonify({"enabled": True}), 200 # Default to enabled on error
finally:
if conn:
release_db_connection(conn)
# Initialize the database when the application starts
try:
init_db()
logger.info("Database initialized during application startup")
except Exception as e:
logger.error(f"Database initialization error during startup: {e}")
if __name__ == '__main__':
try:
app.run(debug=os.environ.get('FLASK_DEBUG', '0') == '1', host='0.0.0.0')
except Exception as e:
logger.error(f"Application startup error: {e}")