Files
Warracker/backend/app.py
sassanix 8165b797d8 Notifications and QOL
Refer to changelogs
2025-03-26 09:27:44 -03:00

2725 lines
110 KiB
Python

from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for
import psycopg2
from psycopg2 import pool
import os
from datetime import datetime, timedelta, date
from werkzeug.utils import secure_filename
from flask_cors import CORS
import logging
import time
from decimal import Decimal
import jwt
from flask_bcrypt import Bcrypt
import re
from functools import wraps
import uuid
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
from pytz import timezone as pytz_timezone
import pytz
import threading
app = Flask(__name__)
CORS(app, supports_credentials=True) # Enable CORS with credentials
bcrypt = Bcrypt(app)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set a secret key for session and JWT
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key_change_in_production')
app.config['JWT_EXPIRATION_DELTA'] = timedelta(days=7) # Token expiration time
UPLOAD_FOLDER = '/data/uploads'
ALLOWED_EXTENSIONS = {'pdf', 'png', 'jpg', 'jpeg'}
MAX_CONTENT_LENGTH = 32 * 1024 * 1024 # 32MB max upload
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
# PostgreSQL connection pool
DB_HOST = os.environ.get('DB_HOST', 'warrackerdb')
DB_NAME = os.environ.get('DB_NAME', 'warranty_db')
DB_USER = os.environ.get('DB_USER', 'warranty_user')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'warranty_password')
DB_ADMIN_USER = os.environ.get('DB_ADMIN_USER', 'warracker_admin')
DB_ADMIN_PASSWORD = os.environ.get('DB_ADMIN_PASSWORD', 'change_this_password_in_production')
# Add connection retry logic
def create_db_pool(max_retries=5, retry_delay=5):
attempt = 0
last_exception = None
while attempt < max_retries:
try:
logger.info(f"Attempting to connect to database (attempt {attempt+1}/{max_retries})")
connection_pool = pool.SimpleConnectionPool(
1, 10, # min, max connections
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
logger.info("Database connection successful")
return connection_pool
except Exception as e:
last_exception = e
logger.error(f"Database connection error: {e}")
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
attempt += 1
# If we got here, all connection attempts failed
logger.error(f"Failed to connect to database after {max_retries} attempts")
raise last_exception
# Create a connection pool with retry logic
try:
connection_pool = create_db_pool()
except Exception as e:
logger.error(f"Fatal database connection error: {e}")
# Allow the app to start even if DB connection fails
# This lets us serve static files while DB is unavailable
connection_pool = None
def get_db_connection():
try:
if connection_pool is None:
raise Exception("Database connection pool not initialized")
return connection_pool.getconn()
except Exception as e:
logger.error(f"Database connection error: {e}")
raise
def get_admin_db_connection():
"""Get a database connection with admin privileges for user management"""
try:
# Connect using the admin role for administrative tasks
conn = psycopg2.connect(
host=DB_HOST,
database=DB_NAME,
user=DB_ADMIN_USER,
password=DB_ADMIN_PASSWORD
)
return conn
except Exception as e:
logger.error(f"Admin database connection error: {e}")
raise
def release_db_connection(conn):
connection_pool.putconn(conn)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Initialize database
def init_db():
"""Initialize the database with required tables"""
try:
conn = get_db_connection()
cur = conn.cursor()
# Create users table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
# Create user_preferences table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS user_preferences (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
email_notifications BOOLEAN NOT NULL DEFAULT TRUE,
default_view VARCHAR(10) NOT NULL DEFAULT 'grid',
theme VARCHAR(10) NOT NULL DEFAULT 'light',
expiring_soon_days INTEGER NOT NULL DEFAULT 30,
notification_frequency VARCHAR(10) NOT NULL DEFAULT 'daily',
notification_time VARCHAR(5) NOT NULL DEFAULT '09:00',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(user_id)
)
""")
# Check if timezone column exists and add if it doesn't
cur.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'user_preferences' AND column_name = 'timezone'
) THEN
ALTER TABLE user_preferences
ADD COLUMN timezone VARCHAR(50) NOT NULL DEFAULT 'UTC';
RAISE NOTICE 'Added timezone column to user_preferences table';
END IF;
END $$;
""")
# Create warranties table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS warranties (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
item_name VARCHAR(100) NOT NULL,
purchase_date DATE NOT NULL,
expiration_date DATE NOT NULL,
purchase_price DECIMAL(10,2),
serial_number VARCHAR(100),
category VARCHAR(50),
notes TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
# Create warranty_documents table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS warranty_documents (
id INTEGER PRIMARY KEY,
warranty_id INTEGER NOT NULL REFERENCES warranties(id) ON DELETE CASCADE,
file_name VARCHAR(255) NOT NULL,
file_path VARCHAR(255) NOT NULL,
file_type VARCHAR(50),
file_size INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
# Create sequence if it doesn't exist
cur.execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'warranty_documents_id_seq') THEN
CREATE SEQUENCE warranty_documents_id_seq;
END IF;
END $$;
""")
# Alter table to use the sequence
cur.execute("""
ALTER TABLE warranty_documents
ALTER COLUMN id SET DEFAULT nextval('warranty_documents_id_seq');
""")
conn.commit()
cur.close()
conn.close()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization error: {str(e)}")
raise
# Authentication helper functions
def generate_token(user_id):
"""Generate a JWT token for the user"""
payload = {
'exp': datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA'],
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def decode_token(token):
"""Decode a JWT token and return the user_id"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
return None # Token has expired
except jwt.InvalidTokenError:
return None # Invalid token
def token_required(f):
"""Decorator to protect routes that require authentication"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from Authorization header
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
# If no token in header, check form data for POST requests
if not token and request.method == 'POST':
token = request.form.get('auth_token') # Check form data
# If still no token, check URL query parameters
if not token:
token = request.args.get('token') # Check query parameters
# If no token is provided
if not token:
logger.warning(f"Authentication attempt without token: {request.path}")
return jsonify({'message': 'Authentication token is missing!'}), 401
# Decode the token
user_id = decode_token(token)
if not user_id:
logger.warning(f"Invalid token used for: {request.path}")
return jsonify({'message': 'Invalid or expired token!'}), 401
# Check if user exists
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute('SELECT id, username, email, is_admin FROM users WHERE id = %s AND is_active = TRUE', (user_id,))
user = cur.fetchone()
if not user:
return jsonify({'message': 'User not found or inactive!'}), 401
# Add user info to request context
request.user = {
'id': user[0],
'username': user[1],
'email': user[2],
'is_admin': user[3]
}
return f(*args, **kwargs)
except Exception as e:
logger.error(f"Authentication error: {e}")
return jsonify({'message': 'Authentication error!'}), 500
finally:
if conn:
release_db_connection(conn)
return decorated
def admin_required(f):
"""Decorator to protect routes that require admin privileges"""
@wraps(f)
def decorated(*args, **kwargs):
logger.info("Admin required check started")
token_required_result = token_required(lambda: None)()
if isinstance(token_required_result, tuple) and token_required_result[1] != 200:
logger.error(f"Token validation failed: {token_required_result}")
return token_required_result
logger.info(f"User info: {request.user}")
if not request.user.get('is_admin', False):
logger.error(f"User {request.user.get('username')} is not an admin")
return jsonify({'message': 'Admin privileges required!'}), 403
logger.info(f"Admin check passed for user {request.user.get('username')}")
return f(*args, **kwargs)
return decorated
def is_valid_email(email):
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_valid_password(password):
"""Validate password strength"""
# At least 8 characters, 1 uppercase, 1 lowercase, 1 number
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False
if not re.search(r'[a-z]', password):
return False
if not re.search(r'[0-9]', password):
return False
return True
def is_valid_timezone(tz):
"""Validate if a timezone string is valid"""
try:
pytz.timezone(tz)
return True
except pytz.exceptions.UnknownTimeZoneError:
return False
# Authentication routes
@app.route('/api/auth/register', methods=['POST'])
def register():
conn = None
try:
data = request.get_json()
# Check if registration is enabled
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
registration_enabled = True
if table_exists:
# Get registration_enabled setting
cur.execute("SELECT value FROM site_settings WHERE key = 'registration_enabled'")
result = cur.fetchone()
if result:
registration_enabled = result[0].lower() == 'true'
# Check if there are any users (first user can register regardless of setting)
cur.execute('SELECT COUNT(*) FROM users')
user_count = cur.fetchone()[0]
# If registration is disabled and this is not the first user, return error
if not registration_enabled and user_count > 0:
return jsonify({'message': 'Registration is currently disabled!'}), 403
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'message': f'{field} is required!'}), 400
username = data['username']
email = data['email']
password = data['password']
first_name = data.get('first_name', '')
last_name = data.get('last_name', '')
# Validate email format
if not is_valid_email(email):
return jsonify({'message': 'Invalid email format!'}), 400
# Validate password strength
if not is_valid_password(password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
# Hash the password
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
with conn.cursor() as cur:
# Check if username or email already exists
cur.execute('SELECT id FROM users WHERE username = %s OR email = %s', (username, email))
existing_user = cur.fetchone()
if existing_user:
return jsonify({'message': 'Username or email already exists!'}), 409
# Check if this is the first user (who will be an admin)
cur.execute('SELECT COUNT(*) FROM users')
user_count = cur.fetchone()[0]
is_admin = user_count == 0
# Insert new user
cur.execute(
'INSERT INTO users (username, email, password_hash, first_name, last_name, is_admin) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id',
(username, email, password_hash, first_name, last_name, is_admin)
)
user_id = cur.fetchone()[0]
# Generate token
token = generate_token(user_id)
# Update last login
cur.execute('UPDATE users SET last_login = %s WHERE id = %s', (datetime.utcnow(), user_id))
# Store session info
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent', '')
session_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA']
cur.execute(
'INSERT INTO user_sessions (user_id, session_token, expires_at, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s)',
(user_id, session_token, expires_at, ip_address, user_agent)
)
conn.commit()
return jsonify({
'message': 'User registered successfully!',
'token': token,
'user': {
'id': user_id,
'username': username,
'email': email,
'is_admin': is_admin
}
}), 201
except Exception as e:
logger.error(f"Registration error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Registration failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/login', methods=['POST'])
def login():
conn = None
try:
data = request.get_json()
# Validate required fields
if not data.get('username') or not data.get('password'):
return jsonify({'message': 'Username and password are required!'}), 400
username = data['username']
password = data['password']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id, username, email, password_hash, is_active FROM users WHERE username = %s OR email = %s', (username, username))
user = cur.fetchone()
if not user or not bcrypt.check_password_hash(user[3], password):
return jsonify({'message': 'Invalid username or password!'}), 401
if not user[4]: # is_active
return jsonify({'message': 'Account is inactive!'}), 401
user_id = user[0]
# Generate token
token = generate_token(user_id)
# Update last login
cur.execute('UPDATE users SET last_login = %s WHERE id = %s', (datetime.utcnow(), user_id))
# Store session info
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent', '')
session_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + app.config['JWT_EXPIRATION_DELTA']
cur.execute(
'INSERT INTO user_sessions (user_id, session_token, expires_at, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s)',
(user_id, session_token, expires_at, ip_address, user_agent)
)
conn.commit()
return jsonify({
'message': 'Login successful!',
'token': token,
'user': {
'id': user_id,
'username': user[1],
'email': user[2]
}
}), 200
except Exception as e:
logger.error(f"Login error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Login failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/logout', methods=['POST'])
@token_required
def logout():
conn = None
try:
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
else:
return jsonify({'message': 'No token provided!'}), 400
user_id = request.user['id']
conn = get_db_connection()
with conn.cursor() as cur:
# Invalidate all sessions for this user
cur.execute('DELETE FROM user_sessions WHERE user_id = %s', (user_id,))
conn.commit()
return jsonify({'message': 'Logout successful!'}), 200
except Exception as e:
logger.error(f"Logout error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Logout failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/validate-token', methods=['GET'])
@token_required
def validate_token():
"""Validate JWT token and return user info"""
try:
# If we got here, the token is valid (token_required decorator validated it)
return jsonify({
'valid': True,
'user': {
'id': request.user['id'],
'username': request.user['username'],
'email': request.user['email'],
'is_admin': request.user['is_admin']
},
'message': 'Token is valid'
}), 200
except Exception as e:
logger.error(f"Token validation error: {e}")
return jsonify({
'valid': False,
'message': 'Invalid token'
}), 401
@app.route('/api/auth/user', methods=['GET'])
@token_required
def get_user():
try:
user = request.user
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'is_admin': user['is_admin']
}), 200
except Exception as e:
logger.error(f"Get user error: {e}")
return jsonify({'message': 'Failed to retrieve user information!'}), 500
@app.route('/api/auth/password/reset-request', methods=['POST'])
def request_password_reset():
conn = None
try:
data = request.get_json()
if not data.get('email'):
return jsonify({'message': 'Email is required!'}), 400
email = data['email']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id FROM users WHERE email = %s', (email,))
user = cur.fetchone()
if not user:
# Don't reveal if email exists or not for security
return jsonify({'message': 'If your email is registered, you will receive a password reset link.'}), 200
user_id = user[0]
# Generate reset token
reset_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(hours=24)
# Delete any existing tokens for this user
cur.execute('DELETE FROM password_reset_tokens WHERE user_id = %s', (user_id,))
# Insert new token
cur.execute(
'INSERT INTO password_reset_tokens (user_id, token, expires_at) VALUES (%s, %s, %s)',
(user_id, reset_token, expires_at)
)
conn.commit()
# In a real application, you would send an email with the reset link
# For now, we'll just return the token in the response (for testing purposes)
reset_link = f"/reset-password?token={reset_token}"
logger.info(f"Password reset requested for user {user_id}. Reset link: {reset_link}")
return jsonify({
'message': 'If your email is registered, you will receive a password reset link.',
'reset_link': reset_link # Remove this in production
}), 200
except Exception as e:
logger.error(f"Password reset request error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password reset request failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/password/reset', methods=['POST'])
def reset_password():
conn = None
try:
data = request.get_json()
if not data.get('token') or not data.get('password'):
return jsonify({'message': 'Token and password are required!'}), 400
token = data['token']
password = data['password']
# Validate password strength
if not is_valid_password(password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
conn = get_db_connection()
with conn.cursor() as cur:
# Check if token exists and is valid
cur.execute('SELECT user_id, expires_at FROM password_reset_tokens WHERE token = %s', (token,))
token_info = cur.fetchone()
if not token_info or token_info[1] < datetime.utcnow():
return jsonify({'message': 'Invalid or expired token!'}), 400
user_id = token_info[0]
# Hash the new password
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
# Update user's password
cur.execute('UPDATE users SET password_hash = %s WHERE id = %s', (password_hash, user_id))
# Delete the used token
cur.execute('DELETE FROM password_reset_tokens WHERE token = %s', (token,))
conn.commit()
return jsonify({'message': 'Password reset successful!'}), 200
except Exception as e:
logger.error(f"Password reset error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password reset failed!'}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/auth/password/change', methods=['POST'])
@token_required
def change_password():
conn = None
try:
data = request.get_json()
if not data.get('current_password') or not data.get('new_password'):
return jsonify({'message': 'Current password and new password are required!'}), 400
current_password = data['current_password']
new_password = data['new_password']
# Validate password strength
if not is_valid_password(new_password):
return jsonify({'message': 'Password must be at least 8 characters and include uppercase, lowercase, and numbers!'}), 400
user_id = request.user['id']
conn = get_db_connection()
with conn.cursor() as cur:
# Check current password
cur.execute('SELECT password_hash FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user or not bcrypt.check_password_hash(user[0], current_password):
return jsonify({'message': 'Current password is incorrect!'}), 401
# Hash the new password
password_hash = bcrypt.generate_password_hash(new_password).decode('utf-8')
# Update user's password
cur.execute('UPDATE users SET password_hash = %s WHERE id = %s', (password_hash, user_id))
conn.commit()
return jsonify({'message': 'Password changed successfully!'}), 200
except Exception as e:
logger.error(f"Password change error: {e}")
if conn:
conn.rollback()
return jsonify({'message': 'Password change failed!'}), 500
finally:
if conn:
release_db_connection(conn)
# Update existing endpoints to use authentication
@app.route('/api/warranties', methods=['GET'])
@token_required
def get_warranties():
conn = None
try:
conn = get_db_connection()
user_id = request.user['id']
is_admin = request.user['is_admin']
with conn.cursor() as cur:
# If admin, can see all warranties, otherwise only user's warranties
if is_admin:
cur.execute('SELECT * FROM warranties ORDER BY expiration_date')
else:
cur.execute('SELECT * FROM warranties WHERE user_id = %s ORDER BY expiration_date', (user_id,))
warranties = cur.fetchall()
columns = [desc[0] for desc in cur.description]
warranties_list = []
for row in warranties:
warranty_dict = dict(zip(columns, row))
# Convert date objects to ISO format strings for JSON serialization
for key, value in warranty_dict.items():
if isinstance(value, (datetime, date)):
warranty_dict[key] = value.isoformat()
# Convert Decimal objects to float for JSON serialization
elif isinstance(value, Decimal):
warranty_dict[key] = float(value)
# Get serial numbers for this warranty
warranty_id = warranty_dict['id']
cur.execute('SELECT serial_number FROM serial_numbers WHERE warranty_id = %s', (warranty_id,))
serial_numbers = [row[0] for row in cur.fetchall()]
warranty_dict['serial_numbers'] = serial_numbers
warranties_list.append(warranty_dict)
return jsonify(warranties_list)
except Exception as e:
logger.error(f"Error retrieving warranties: {e}")
return jsonify({"error": "Failed to retrieve warranties"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties', methods=['POST'])
@token_required
def add_warranty():
conn = None
try:
# Validate input data
if not request.form.get('product_name'):
return jsonify({"error": "Product name is required"}), 400
if not request.form.get('purchase_date'):
return jsonify({"error": "Purchase date is required"}), 400
try:
warranty_years = int(request.form.get('warranty_years', '0'))
if warranty_years <= 0 or warranty_years > 100: # Set reasonable limits
return jsonify({"error": "Warranty years must be between 1 and 100"}), 400
except ValueError:
return jsonify({"error": "Warranty years must be a valid number"}), 400
# Process the data
product_name = request.form['product_name']
purchase_date_str = request.form['purchase_date']
serial_numbers = request.form.getlist('serial_numbers')
product_url = request.form.get('product_url', '')
user_id = request.user['id']
# Handle purchase price (optional)
purchase_price = None
if request.form.get('purchase_price'):
try:
purchase_price = float(request.form.get('purchase_price'))
if purchase_price < 0:
return jsonify({"error": "Purchase price cannot be negative"}), 400
except ValueError:
return jsonify({"error": "Purchase price must be a valid number"}), 400
try:
purchase_date = datetime.strptime(purchase_date_str, '%Y-%m-%d')
except ValueError:
return jsonify({"error": "Invalid date format. Use YYYY-MM-DD"}), 400
expiration_date = purchase_date + timedelta(days=warranty_years * 365)
# Handle invoice file upload
db_invoice_path = None
if 'invoice' in request.files:
invoice = request.files['invoice']
if invoice.filename != '':
if not allowed_file(invoice.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(invoice.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{filename}"
invoice_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
invoice.save(invoice_path)
db_invoice_path = os.path.join('uploads', filename)
logger.info(f"New invoice uploaded: {db_invoice_path}")
# Handle manual file upload
db_manual_path = None
if 'manual' in request.files:
manual = request.files['manual']
if manual.filename != '':
if not allowed_file(manual.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(manual.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_manual_{filename}"
manual_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
manual.save(manual_path)
db_manual_path = os.path.join('uploads', filename)
logger.info(f"New manual uploaded: {db_manual_path}")
# Save to database
conn = get_db_connection()
with conn.cursor() as cur:
# Insert warranty
cur.execute('''
INSERT INTO warranties (product_name, purchase_date, warranty_years, expiration_date, invoice_path, manual_path, product_url, purchase_price, user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
''', (product_name, purchase_date, warranty_years, expiration_date, db_invoice_path, db_manual_path, product_url, purchase_price, user_id))
warranty_id = cur.fetchone()[0]
# Insert serial numbers
if serial_numbers:
for serial_number in serial_numbers:
if serial_number.strip(): # Only insert non-empty serial numbers
cur.execute('''
INSERT INTO serial_numbers (warranty_id, serial_number)
VALUES (%s, %s)
''', (warranty_id, serial_number.strip()))
conn.commit()
return jsonify({
'message': 'Warranty added successfully',
'id': warranty_id
}), 201
except Exception as e:
logger.error(f"Error adding warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to add warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties/<int:warranty_id>', methods=['DELETE'])
@token_required
def delete_warranty(warranty_id):
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if warranty exists and belongs to the user
if is_admin:
cur.execute('SELECT id FROM warranties WHERE id = %s', (warranty_id,))
else:
cur.execute('SELECT id FROM warranties WHERE id = %s AND user_id = %s', (warranty_id, user_id))
warranty = cur.fetchone()
if not warranty:
return jsonify({"error": "Warranty not found or you don't have permission to delete it"}), 404
# First get the invoice path to delete the file
cur.execute('SELECT invoice_path, manual_path FROM warranties WHERE id = %s', (warranty_id,))
result = cur.fetchone()
invoice_path = result[0]
manual_path = result[1]
# Delete the warranty from database
cur.execute('DELETE FROM warranties WHERE id = %s', (warranty_id,))
deleted_rows = cur.rowcount
conn.commit()
# Delete the invoice file if it exists
if invoice_path:
full_path = os.path.join('/data', invoice_path)
if os.path.exists(full_path):
os.remove(full_path)
# Delete the manual file if it exists
if manual_path:
full_path = os.path.join('/data', manual_path)
if os.path.exists(full_path):
os.remove(full_path)
return jsonify({"message": "Warranty deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to delete warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/warranties/<int:warranty_id>', methods=['PUT'])
@token_required
def update_warranty(warranty_id):
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# Check if warranty exists and belongs to the user
if is_admin:
cur.execute('SELECT id FROM warranties WHERE id = %s', (warranty_id,))
else:
cur.execute('SELECT id FROM warranties WHERE id = %s AND user_id = %s', (warranty_id, user_id))
warranty = cur.fetchone()
if not warranty:
return jsonify({"error": "Warranty not found or you don't have permission to update it"}), 404
# Validate input data similar to the add_warranty route
if not request.form.get('product_name'):
return jsonify({"error": "Product name is required"}), 400
if not request.form.get('purchase_date'):
return jsonify({"error": "Purchase date is required"}), 400
try:
warranty_years = int(request.form.get('warranty_years', '0'))
if warranty_years <= 0 or warranty_years > 100:
return jsonify({"error": "Warranty years must be between 1 and 100"}), 400
except ValueError:
return jsonify({"error": "Warranty years must be a valid number"}), 400
# Process the data
product_name = request.form['product_name']
purchase_date_str = request.form['purchase_date']
serial_numbers = request.form.getlist('serial_numbers')
product_url = request.form.get('product_url', '')
# Handle purchase price (optional)
purchase_price = None
if request.form.get('purchase_price'):
try:
purchase_price = float(request.form.get('purchase_price'))
if purchase_price < 0:
return jsonify({"error": "Purchase price cannot be negative"}), 400
except ValueError:
return jsonify({"error": "Purchase price must be a valid number"}), 400
try:
purchase_date = datetime.strptime(purchase_date_str, '%Y-%m-%d')
except ValueError:
return jsonify({"error": "Invalid date format. Use YYYY-MM-DD"}), 400
expiration_date = purchase_date + timedelta(days=warranty_years * 365)
# Handle invoice file upload if new file is provided
db_invoice_path = None
if 'invoice' in request.files:
invoice = request.files['invoice']
if invoice.filename != '':
if not allowed_file(invoice.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(invoice.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{filename}"
invoice_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
invoice.save(invoice_path)
db_invoice_path = os.path.join('uploads', filename)
logger.info(f"New invoice uploaded: {db_invoice_path}")
# Remove old invoice file if exists and different from new one
cur.execute('SELECT invoice_path FROM warranties WHERE id = %s', (warranty_id,))
old_invoice_path = cur.fetchone()[0]
if old_invoice_path and old_invoice_path != db_invoice_path:
old_full_path = os.path.join('/data', old_invoice_path)
if os.path.exists(old_full_path):
os.remove(old_full_path)
logger.info(f"Removed old invoice: {old_invoice_path}")
else:
# If no new invoice file is uploaded, preserve the existing one
cur.execute('SELECT invoice_path FROM warranties WHERE id = %s', (warranty_id,))
db_invoice_path = cur.fetchone()[0]
logger.info(f"Preserving existing invoice: {db_invoice_path}")
# Handle manual file upload if new file is provided
db_manual_path = None
if 'manual' in request.files:
manual = request.files['manual']
if manual.filename != '':
if not allowed_file(manual.filename):
return jsonify({"error": "File type not allowed. Use PDF, PNG, JPG, or JPEG"}), 400
filename = secure_filename(manual.filename)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_manual_{filename}"
manual_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
manual.save(manual_path)
db_manual_path = os.path.join('uploads', filename)
logger.info(f"New manual uploaded: {db_manual_path}")
# Remove old manual file if exists and different from new one
cur.execute('SELECT manual_path FROM warranties WHERE id = %s', (warranty_id,))
old_manual_path = cur.fetchone()[0]
if old_manual_path and old_manual_path != db_manual_path:
old_full_path = os.path.join('/data', old_manual_path)
if os.path.exists(old_full_path):
os.remove(old_full_path)
logger.info(f"Removed old manual: {old_manual_path}")
else:
# If no new manual file is uploaded, preserve the existing one
cur.execute('SELECT manual_path FROM warranties WHERE id = %s', (warranty_id,))
db_manual_path = cur.fetchone()[0]
logger.info(f"Preserving existing manual: {db_manual_path or 'None'}")
# Update the warranty in database - IMPORTANT: The database set operation needs to be updated
# Create a list of parameters for the UPDATE query
update_params = {
'product_name': product_name,
'purchase_date': purchase_date,
'warranty_years': warranty_years,
'expiration_date': expiration_date,
'product_url': product_url,
'purchase_price': purchase_price
}
# Build dynamic SQL query based on which files have been uploaded or preserved
sql_fields = []
sql_values = []
for key, value in update_params.items():
sql_fields.append(f"{key} = %s")
sql_values.append(value)
# Only include invoice_path in the update if it's not None
if db_invoice_path is not None:
sql_fields.append("invoice_path = %s")
sql_values.append(db_invoice_path)
# Only include manual_path in the update if it's not None
if db_manual_path is not None:
sql_fields.append("manual_path = %s")
sql_values.append(db_manual_path)
# Add the warranty_id at the end
sql_values.append(warranty_id)
# Execute the dynamic SQL update
update_sql = f"UPDATE warranties SET {', '.join(sql_fields)} WHERE id = %s"
cur.execute(update_sql, sql_values)
logger.info(f"Updated warranty with SQL: {update_sql}")
logger.info(f"Parameters: {sql_values}")
# Update serial numbers
# First, delete existing serial numbers for this warranty
cur.execute('DELETE FROM serial_numbers WHERE warranty_id = %s', (warranty_id,))
# Then insert the new serial numbers
if serial_numbers:
for serial_number in serial_numbers:
if serial_number.strip(): # Only insert non-empty serial numbers
cur.execute('''
INSERT INTO serial_numbers (warranty_id, serial_number)
VALUES (%s, %s)
''', (warranty_id, serial_number.strip()))
conn.commit()
return jsonify({"message": "Warranty updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating warranty: {e}")
if conn:
conn.rollback()
return jsonify({"error": "Failed to update warranty"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/statistics', methods=['GET'])
@token_required
def get_statistics():
user_id = request.user['id']
conn = None
try:
conn = get_db_connection()
# Get the user's expiring_soon_days preference
expiring_soon_days = 30 # Default value
try:
# Check if user_preferences table exists before trying to query it
with conn.cursor() as check_cur:
check_cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'user_preferences'
)
""")
table_exists = check_cur.fetchone()[0]
if table_exists:
with conn.cursor() as pref_cur:
pref_cur.execute(
"""
SELECT expiring_soon_days FROM user_preferences
WHERE user_id = %s
""",
(user_id,)
)
preference = pref_cur.fetchone()
if preference:
expiring_soon_days = preference[0]
except Exception as e:
logger.error(f"Error getting user preferences: {e}")
# Continue with default value
# Current date
today = date.today()
thirty_days_later = today + timedelta(days=expiring_soon_days)
ninety_days_later = today + timedelta(days=90)
# Build the SQL query based on user role
from_clause = "FROM warranties"
where_clause = ""
active_where = "AND"
params = []
# For non-admin users, filter by user_id
if not request.user.get('is_admin', False):
# For non-admin users, add join to warranty_users table
from_clause = "FROM warranties w JOIN warranty_users wu ON w.id = wu.warranty_id"
where_clause = "WHERE wu.user_id = %s"
params = [user_id]
with conn.cursor() as cur:
# Get total count
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause}", params)
total_count = cur.fetchone()[0]
logger.info(f"Total warranties: {total_count}")
# Get active count
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s", params + [today])
active_count = cur.fetchone()[0]
logger.info(f"Active warranties: {active_count}")
# Get expired count
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date <= %s", params + [today])
expired_count = cur.fetchone()[0]
logger.info(f"Expired warranties: {expired_count}")
# Get expiring soon count (using user preference)
cur.execute(f"SELECT COUNT(*) {from_clause} {where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s AND expiration_date <= %s",
params + [today, thirty_days_later])
expiring_soon_count = cur.fetchone()[0]
logger.info(f"Expiring soon warranties: {expiring_soon_count}")
# Get expiration timeline (next 90 days, grouped by month)
cur.execute(f"""
SELECT
EXTRACT(YEAR FROM expiration_date) as year,
EXTRACT(MONTH FROM expiration_date) as month,
COUNT(*) as count
{from_clause}
{where_clause} {active_where if where_clause else 'WHERE'} expiration_date > %s AND expiration_date <= %s
GROUP BY EXTRACT(YEAR FROM expiration_date), EXTRACT(MONTH FROM expiration_date)
ORDER BY year, month
""", params + [today, ninety_days_later])
timeline = []
for row in cur.fetchall():
year = int(row[0])
month = int(row[1])
count = row[2]
timeline.append({
"year": year,
"month": month,
"count": count
})
# Get recent expiring warranties (30 days before and after today)
thirty_days_ago = today - timedelta(days=30)
cur.execute(f"""
SELECT
id, product_name, purchase_date, warranty_years,
expiration_date, invoice_path, manual_path, product_url, purchase_price
{from_clause}
{where_clause} {active_where if where_clause else 'WHERE'} expiration_date >= %s AND expiration_date <= %s
ORDER BY expiration_date
LIMIT 10
""", params + [thirty_days_ago, thirty_days_later])
columns = [desc[0] for desc in cur.description]
recent_warranties = []
for row in cur.fetchall():
warranty = dict(zip(columns, row))
# Convert dates to string format
if warranty['purchase_date']:
warranty['purchase_date'] = warranty['purchase_date'].isoformat()
if warranty['expiration_date']:
warranty['expiration_date'] = warranty['expiration_date'].isoformat()
# Convert Decimal objects to float for JSON serialization
if warranty.get('purchase_price') and isinstance(warranty['purchase_price'], Decimal):
warranty['purchase_price'] = float(warranty['purchase_price'])
recent_warranties.append(warranty)
statistics = {
'total': total_count,
'active': active_count,
'expired': expired_count,
'expiring_soon': expiring_soon_count,
'timeline': timeline,
'recent_warranties': recent_warranties
}
return jsonify(statistics)
except Exception as e:
logger.error(f"Error getting warranty statistics: {e}")
return jsonify({"error": str(e)}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/test', methods=['GET'])
def test_endpoint():
"""Simple test endpoint to check if the API is responding."""
return jsonify({
"status": "success",
"message": "API is working",
"timestamp": datetime.utcnow().isoformat()
})
# Public endpoint to check if authentication is required
@app.route('/api/auth/status', methods=['GET'])
def auth_status():
return jsonify({
"authentication_required": True,
"message": "Authentication is required for most endpoints"
})
@app.route('/api/auth/profile', methods=['PUT'])
@token_required
def update_profile():
user_id = request.user['id']
try:
# Get request data
data = request.get_json()
if not data:
return jsonify({'message': 'No input data provided'}), 400
# Extract fields
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
# Validate input
if not first_name or not last_name:
return jsonify({'message': 'First name and last name are required'}), 400
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Update user profile
cursor.execute(
"""
UPDATE users
SET first_name = %s, last_name = %s, updated_at = NOW()
WHERE id = %s
RETURNING id, username, email, first_name, last_name, created_at, updated_at
""",
(first_name, last_name, user_id)
)
# Get updated user data
user_data = cursor.fetchone()
if not user_data:
return jsonify({'message': 'User not found'}), 404
# Commit changes
conn.commit()
# Format user data
user = {
'id': user_data[0],
'username': user_data[1],
'email': user_data[2],
'first_name': user_data[3],
'last_name': user_data[4],
'created_at': user_data[5].isoformat() if user_data[5] else None,
'updated_at': user_data[6].isoformat() if user_data[6] else None
}
return jsonify(user), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in update_profile: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in update_profile: {str(e)}")
return jsonify({'message': 'An error occurred while updating profile'}), 500
@app.route('/api/auth/account', methods=['DELETE'])
@token_required
def delete_account():
user_id = request.user['id']
try:
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Begin transaction
cursor.execute("BEGIN")
# Delete user's warranties
cursor.execute("DELETE FROM warranties WHERE user_id = %s", (user_id,))
# Delete user's reset tokens if any
cursor.execute("DELETE FROM password_reset_tokens WHERE user_id = %s", (user_id,))
# Delete user
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
# Commit transaction
cursor.execute("COMMIT")
return jsonify({'message': 'Account deleted successfully'}), 200
except Exception as e:
cursor.execute("ROLLBACK")
logger.error(f"Database error in delete_account: {str(e)}")
return jsonify({'message': 'Database error occurred'}), 500
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in delete_account: {str(e)}")
return jsonify({'message': 'An error occurred while deleting account'}), 500
@app.route('/api/auth/preferences', methods=['GET'])
@token_required
def get_preferences():
user_id = request.user['id']
try:
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Check if user_preferences table exists
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'user_preferences'
)
""")
table_exists = cursor.fetchone()[0]
if not table_exists:
# Create the user_preferences table
cursor.execute("""
CREATE TABLE user_preferences (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
email_notifications BOOLEAN NOT NULL DEFAULT TRUE,
default_view VARCHAR(10) NOT NULL DEFAULT 'grid',
theme VARCHAR(10) NOT NULL DEFAULT 'light',
expiring_soon_days INTEGER NOT NULL DEFAULT 30,
notification_frequency VARCHAR(10) NOT NULL DEFAULT 'daily',
notification_time VARCHAR(5) NOT NULL DEFAULT '09:00',
timezone VARCHAR(50) NOT NULL DEFAULT 'UTC',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(user_id)
)
""")
# Add index for faster lookups
cursor.execute("""
CREATE INDEX idx_user_preferences_user_id ON user_preferences(user_id)
""")
conn.commit()
logger.info(f"Created user_preferences table")
# Get user preferences
cursor.execute(
"""
SELECT email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time, timezone
FROM user_preferences
WHERE user_id = %s
""",
(user_id,)
)
preferences_data = cursor.fetchone()
if not preferences_data:
# Create default preferences if not exists
cursor.execute(
"""
INSERT INTO user_preferences (user_id, email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time, timezone)
VALUES (%s, TRUE, 'grid', 'light', 30, 'daily', '09:00', 'UTC')
RETURNING email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time, timezone
""",
(user_id,)
)
preferences_data = cursor.fetchone()
conn.commit()
# Format preferences data
preferences = {
'email_notifications': preferences_data[0],
'default_view': preferences_data[1],
'theme': preferences_data[2],
'expiring_soon_days': preferences_data[3],
'notification_frequency': preferences_data[4],
'notification_time': preferences_data[5],
'timezone': preferences_data[6]
}
return jsonify(preferences), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in get_preferences: {str(e)}")
# Return default preferences as fallback
default_preferences = {
'email_notifications': True,
'default_view': 'grid',
'theme': 'light',
'expiring_soon_days': 30,
'notification_frequency': 'daily',
'notification_time': '09:00',
'timezone': 'UTC'
}
return jsonify(default_preferences), 200
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in get_preferences: {str(e)}")
# Return default preferences as fallback
default_preferences = {
'email_notifications': True,
'default_view': 'grid',
'theme': 'light',
'expiring_soon_days': 30,
'notification_frequency': 'daily',
'notification_time': '09:00',
'timezone': 'UTC'
}
return jsonify(default_preferences), 200
@app.route('/api/auth/preferences', methods=['PUT'])
@token_required
def update_preferences():
user_id = request.user['id']
try:
# Get request data
data = request.get_json()
if not data:
return jsonify({'message': 'No input data provided'}), 400
# Extract fields
email_notifications = data.get('email_notifications')
default_view = data.get('default_view')
theme = data.get('theme')
expiring_soon_days = data.get('expiring_soon_days')
notification_frequency = data.get('notification_frequency')
notification_time = data.get('notification_time')
timezone = data.get('timezone')
# Validate input
if default_view and default_view not in ['grid', 'list', 'table']:
return jsonify({'message': 'Invalid default view'}), 400
if theme and theme not in ['light', 'dark']:
return jsonify({'message': 'Invalid theme'}), 400
if expiring_soon_days is not None:
try:
expiring_soon_days = int(expiring_soon_days)
if expiring_soon_days < 1 or expiring_soon_days > 365:
return jsonify({'message': 'Expiring soon days must be between 1 and 365'}), 400
except ValueError:
return jsonify({'message': 'Expiring soon days must be a valid number'}), 400
if notification_frequency and notification_frequency not in ['daily', 'weekly', 'monthly']:
return jsonify({'message': 'Invalid notification frequency'}), 400
if notification_time and not re.match(r'^([01]?[0-9]|2[0-3]):[0-5][0-9]$', notification_time):
return jsonify({'message': 'Invalid notification time format'}), 400
if timezone and not is_valid_timezone(timezone):
return jsonify({'message': 'Invalid timezone'}), 400
# Get database connection
conn = get_db_connection()
cursor = conn.cursor()
try:
# Check if timezone column exists in user_preferences
cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'user_preferences' AND column_name = 'timezone'
""")
has_timezone_column = cursor.fetchone() is not None
# Check if user_preferences table exists
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'user_preferences'
)
""")
table_exists = cursor.fetchone()[0]
if not table_exists:
# Create the user_preferences table
cursor.execute("""
CREATE TABLE user_preferences (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
email_notifications BOOLEAN NOT NULL DEFAULT TRUE,
default_view VARCHAR(10) NOT NULL DEFAULT 'grid',
theme VARCHAR(10) NOT NULL DEFAULT 'light',
expiring_soon_days INTEGER NOT NULL DEFAULT 30,
notification_frequency VARCHAR(10) NOT NULL DEFAULT 'daily',
notification_time VARCHAR(5) NOT NULL DEFAULT '09:00',
timezone VARCHAR(50) NOT NULL DEFAULT 'UTC',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(user_id)
)
""")
# Add index for faster lookups
cursor.execute("""
CREATE INDEX idx_user_preferences_user_id ON user_preferences(user_id)
""")
# Set preferences_exist to false to create new preferences
preferences_exist = False
conn.commit()
logger.info(f"Created user_preferences table")
else:
# Check if preferences exist
cursor.execute(
"SELECT 1 FROM user_preferences WHERE user_id = %s",
(user_id,)
)
preferences_exist = cursor.fetchone() is not None
if preferences_exist:
# Update existing preferences
update_fields = []
update_values = []
if email_notifications is not None:
update_fields.append("email_notifications = %s")
update_values.append(email_notifications)
if default_view:
update_fields.append("default_view = %s")
update_values.append(default_view)
if theme:
update_fields.append("theme = %s")
update_values.append(theme)
if expiring_soon_days is not None:
update_fields.append("expiring_soon_days = %s")
update_values.append(expiring_soon_days)
if notification_frequency:
update_fields.append("notification_frequency = %s")
update_values.append(notification_frequency)
if notification_time:
update_fields.append("notification_time = %s")
update_values.append(notification_time)
if timezone and has_timezone_column:
update_fields.append("timezone = %s")
update_values.append(timezone)
if update_fields:
# Construct return fields based on whether timezone column exists
return_fields = "email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time"
if has_timezone_column:
return_fields += ", timezone"
update_query = f"""
UPDATE user_preferences
SET {', '.join(update_fields)}, updated_at = NOW()
WHERE user_id = %s
RETURNING {return_fields}
"""
cursor.execute(update_query, update_values + [user_id])
preferences_data = cursor.fetchone()
else:
# No fields to update
# Construct select fields based on whether timezone column exists
select_fields = "email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time"
if has_timezone_column:
select_fields += ", timezone"
cursor.execute(
f"""
SELECT {select_fields}
FROM user_preferences
WHERE user_id = %s
""",
(user_id,)
)
preferences_data = cursor.fetchone()
else:
# Create new preferences
# Check for timezone column to adjust INSERT statement
if has_timezone_column:
cursor.execute(
"""
INSERT INTO user_preferences (
user_id,
email_notifications,
default_view,
theme,
expiring_soon_days,
notification_frequency,
notification_time,
timezone
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time, timezone
""",
(
user_id,
email_notifications if email_notifications is not None else True,
default_view or 'grid',
theme or 'light',
expiring_soon_days if expiring_soon_days is not None else 30,
notification_frequency or 'daily',
notification_time or '09:00',
timezone or 'UTC'
)
)
else:
cursor.execute(
"""
INSERT INTO user_preferences (
user_id,
email_notifications,
default_view,
theme,
expiring_soon_days,
notification_frequency,
notification_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING email_notifications, default_view, theme, expiring_soon_days, notification_frequency, notification_time
""",
(
user_id,
email_notifications if email_notifications is not None else True,
default_view or 'grid',
theme or 'light',
expiring_soon_days if expiring_soon_days is not None else 30,
notification_frequency or 'daily',
notification_time or '09:00'
)
)
preferences_data = cursor.fetchone()
# Format preferences data
preferences = {
'email_notifications': preferences_data[0],
'default_view': preferences_data[1],
'theme': preferences_data[2],
'expiring_soon_days': preferences_data[3],
'notification_frequency': preferences_data[4],
'notification_time': preferences_data[5]
}
# Add timezone if column exists
if has_timezone_column and len(preferences_data) > 6:
preferences['timezone'] = preferences_data[6]
else:
preferences['timezone'] = timezone or 'UTC'
conn.commit()
return jsonify(preferences), 200
except Exception as e:
conn.rollback()
logger.error(f"Database error in update_preferences: {str(e)}")
# Return original data as fallback
fallback_preferences = {
'email_notifications': email_notifications if email_notifications is not None else True,
'default_view': default_view or 'grid',
'theme': theme or 'light',
'expiring_soon_days': expiring_soon_days if expiring_soon_days is not None else 30,
'notification_frequency': notification_frequency or 'daily',
'notification_time': notification_time or '09:00',
'timezone': timezone or 'UTC'
}
return jsonify(fallback_preferences), 200
finally:
cursor.close()
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in update_preferences: {str(e)}")
# Return default preferences as fallback
default_preferences = {
'email_notifications': True,
'default_view': 'grid',
'theme': 'light',
'expiring_soon_days': 30,
'notification_frequency': 'daily',
'notification_time': '09:00',
'timezone': 'UTC'
}
return jsonify(default_preferences), 200
# Admin User Management Endpoints
@app.route('/api/admin/users', methods=['GET'])
@admin_required
def get_all_users():
"""Get all users (admin only)"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute('''
SELECT id, username, email, first_name, last_name, is_active, is_admin, created_at, last_login
FROM users
ORDER BY created_at DESC
''')
users = cur.fetchall()
columns = [desc[0] for desc in cur.description]
users_list = []
for row in users:
user_dict = dict(zip(columns, row))
# Convert date objects to ISO format strings for JSON serialization
for key, value in user_dict.items():
if isinstance(value, (datetime, date)):
user_dict[key] = value.isoformat() if value else None
users_list.append(user_dict)
return jsonify(users_list), 200
except Exception as e:
logger.error(f"Error retrieving users: {e}")
return jsonify({"message": "Failed to retrieve users"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""Update user details (admin only)"""
conn = None
try:
# Prevent modifying self
if user_id == request.user['id']:
return jsonify({"message": "Cannot modify your own admin status"}), 403
data = request.get_json()
# Use regular connection since warranty_user now has superuser privileges
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user:
return jsonify({"message": "User not found"}), 404
# Update fields
updates = []
params = []
if 'is_admin' in data:
updates.append('is_admin = %s')
params.append(bool(data['is_admin']))
if 'is_active' in data:
updates.append('is_active = %s')
params.append(bool(data['is_active']))
if not updates:
return jsonify({"message": "No fields to update"}), 400
# Build and execute update query
query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
params.append(user_id)
cur.execute(query, params)
conn.commit()
return jsonify({"message": "User updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating user: {e}")
if conn:
conn.rollback()
return jsonify({"message": f"Failed to update user: {str(e)}"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""Delete a user (admin only)"""
conn = None
try:
logger.info(f"Delete user request received for user_id: {user_id}")
# Prevent deleting self
if user_id == request.user['id']:
logger.warning(f"User {request.user['username']} attempted to delete their own account")
return jsonify({"message": "Cannot delete your own account through admin API"}), 403
# Use regular connection since warranty_user now has superuser privileges
conn = get_db_connection()
with conn.cursor() as cur:
# Check if user exists
cur.execute('SELECT id, username FROM users WHERE id = %s', (user_id,))
user = cur.fetchone()
if not user:
logger.warning(f"User with ID {user_id} not found")
return jsonify({"message": "User not found"}), 404
logger.info(f"Deleting user {user[1]} (ID: {user[0]})")
# Delete user's warranties first
cur.execute('DELETE FROM warranties WHERE user_id = %s', (user_id,))
warranties_deleted = cur.rowcount
logger.info(f"Deleted {warranties_deleted} warranties belonging to user {user_id}")
# Delete user's password reset tokens if any
cur.execute('DELETE FROM password_reset_tokens WHERE user_id = %s', (user_id,))
tokens_deleted = cur.rowcount
logger.info(f"Deleted {tokens_deleted} password reset tokens belonging to user {user_id}")
# Delete user's sessions if any
cur.execute('DELETE FROM user_sessions WHERE user_id = %s', (user_id,))
sessions_deleted = cur.rowcount
logger.info(f"Deleted {sessions_deleted} sessions belonging to user {user_id}")
# Delete user
cur.execute('DELETE FROM users WHERE id = %s', (user_id,))
user_deleted = cur.rowcount
logger.info(f"Deleted user {user_id}, affected rows: {user_deleted}")
conn.commit()
logger.info(f"User {user_id} deleted successfully")
return jsonify({"message": "User deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting user: {e}")
if conn:
conn.rollback()
return jsonify({"message": f"Failed to delete user: {str(e)}"}), 500
finally:
if conn:
release_db_connection(conn)
# Site settings
@app.route('/api/admin/settings', methods=['GET'])
@admin_required
def get_site_settings():
"""Get site settings (admin only)"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
# Create settings table if it doesn't exist
if not table_exists:
cur.execute("""
CREATE TABLE site_settings (
key VARCHAR(255) PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# Get all settings
cur.execute('SELECT key, value FROM site_settings')
settings = {row[0]: row[1] for row in cur.fetchall()}
# Set default values if not present
if 'registration_enabled' not in settings:
settings['registration_enabled'] = 'true'
cur.execute(
'INSERT INTO site_settings (key, value) VALUES (%s, %s)',
('registration_enabled', 'true')
)
conn.commit()
return jsonify(settings), 200
except Exception as e:
logger.error(f"Error retrieving site settings: {e}")
return jsonify({"message": "Failed to retrieve site settings"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/admin/settings', methods=['PUT'])
@admin_required
def update_site_settings():
"""Update site settings (admin only)"""
conn = None
try:
data = request.get_json()
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
# Create settings table if it doesn't exist
if not table_exists:
cur.execute("""
CREATE TABLE site_settings (
key VARCHAR(255) PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Update settings
for key, value in data.items():
# Convert boolean to string
if isinstance(value, bool):
value = str(value).lower()
cur.execute("""
INSERT INTO site_settings (key, value, updated_at)
VALUES (%s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (key)
DO UPDATE SET value = %s, updated_at = CURRENT_TIMESTAMP
""", (key, value, value))
conn.commit()
return jsonify({"message": "Settings updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating site settings: {e}")
if conn:
conn.rollback()
return jsonify({"message": "Failed to update site settings"}), 500
finally:
if conn:
release_db_connection(conn)
# Modify the register endpoint to check if registration is enabled
@app.route('/api/auth/registration-status', methods=['GET'])
def check_registration_status():
"""Check if registration is enabled"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Check if settings table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'site_settings'
)
""")
table_exists = cur.fetchone()[0]
if not table_exists:
# If table doesn't exist, registration is enabled by default
return jsonify({"enabled": True}), 200
# Get registration_enabled setting
cur.execute("SELECT value FROM site_settings WHERE key = 'registration_enabled'")
result = cur.fetchone()
if not result:
# If setting doesn't exist, registration is enabled by default
return jsonify({"enabled": True}), 200
registration_enabled = result[0].lower() == 'true'
return jsonify({"enabled": registration_enabled}), 200
except Exception as e:
logger.error(f"Error checking registration status: {e}")
return jsonify({"enabled": True}), 200 # Default to enabled on error
finally:
if conn:
release_db_connection(conn)
# File serving endpoints
@app.route('/api/files/<path:filename>', methods=['GET', 'POST'])
@token_required
def serve_file(filename):
"""Basic secure file serving with authentication."""
try:
logger.info(f"File access request for {filename} by user {request.user['id']}")
if not filename.startswith('uploads/'):
logger.warning(f"Attempted access to non-uploads file: {filename}")
return jsonify({"message": "Access denied"}), 403
# Remove 'uploads/' prefix for send_from_directory
file_path = filename[8:] if filename.startswith('uploads/') else filename
return send_from_directory('/data/uploads', file_path)
except Exception as e:
logger.error(f"Error serving file {filename}: {e}")
return jsonify({"message": "Error accessing file"}), 500
@app.route('/api/secure-file/<path:filename>', methods=['GET', 'POST'])
@token_required
def secure_file_access(filename):
"""Enhanced secure file serving with authorization checks."""
try:
logger.info(f"Secure file access request for {filename} by user {request.user['id']}")
# Security check for path traversal
if '..' in filename or filename.startswith('/'):
logger.warning(f"Potential path traversal attempt detected: {filename} by user {request.user['id']}")
return jsonify({"message": "Invalid file path"}), 400
# Check if user is authorized to access this file
conn = get_db_connection()
try:
with conn.cursor() as cur:
# Find warranties that reference this file
query = """
SELECT w.id, w.user_id
FROM warranties w
WHERE w.invoice_path = %s OR w.manual_path = %s
"""
cur.execute(query, (f"uploads/{filename}", f"uploads/{filename}"))
results = cur.fetchall()
# Check if user owns any of these warranties or is admin
user_id = request.user['id']
is_admin = request.user.get('is_admin', False)
authorized = is_admin # Admins can access all files
if not authorized and results:
for warranty_id, warranty_user_id in results:
if warranty_user_id == user_id:
authorized = True
break
if not authorized:
logger.warning(f"Unauthorized file access attempt: {filename} by user {user_id}")
return jsonify({"message": "You are not authorized to access this file"}), 403
# Serve the file securely
return send_from_directory('/data/uploads', filename)
finally:
release_db_connection(conn)
except Exception as e:
logger.error(f"Error in secure file access for {filename}: {e}")
return jsonify({"message": "Error accessing file"}), 500
def get_expiring_warranties():
"""
Query the database to find warranties that are expiring soon based on user preferences.
Returns a list of dictionaries containing the necessary information for email notifications.
"""
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Get today's date
today = date.today()
cur.execute("""
SELECT
u.email,
u.first_name,
w.product_name,
w.expiration_date,
COALESCE(up.expiring_soon_days, 30) AS expiring_soon_days
FROM
warranties w
JOIN
users u ON w.user_id = u.id
LEFT JOIN
user_preferences up ON u.id = up.user_id
WHERE
w.expiration_date > %s
AND w.expiration_date <= %s + (COALESCE(up.expiring_soon_days, 30) || ' days')::interval
AND u.is_active = TRUE
AND COALESCE(up.email_notifications, TRUE) = TRUE;
""", (today, today))
expiring_warranties = []
for row in cur.fetchall():
email, first_name, product_name, expiration_date, expiring_soon_days = row
expiration_date_str = expiration_date.strftime('%Y-%m-%d')
expiring_warranties.append({
'email': email,
'first_name': first_name or 'User', # Default if first_name is NULL
'product_name': product_name,
'expiration_date': expiration_date_str,
})
return expiring_warranties
except Exception as e:
logger.error(f"Error retrieving expiring warranties: {e}")
return [] # Return an empty list on error
finally:
if conn:
release_db_connection(conn)
def format_expiration_email(user, warranties):
"""
Format an email notification for expiring warranties.
Returns a MIMEMultipart email object with both text and HTML versions.
"""
subject = "Warracker: Upcoming Warranty Expirations"
# Create both plain text and HTML versions of the email body
text_body = f"Hello {user['first_name']},\n\n"
text_body += "The following warranties are expiring soon:\n\n"
html_body = f"""\
<html>
<head></head>
<body>
<p>Hello {user['first_name']},</p>
<p>The following warranties are expiring soon:</p>
<table border="1" style="border-collapse: collapse;">
<thead>
<tr>
<th style="padding: 8px; text-align: left;">Product Name</th>
<th style="padding: 8px; text-align: left;">Expiration Date</th>
</tr>
</thead>
<tbody>
"""
for warranty in warranties:
text_body += f"- {warranty['product_name']} (expires on {warranty['expiration_date']})\n"
html_body += f"""\
<tr>
<td style="padding: 8px;">{warranty['product_name']}</td>
<td style="padding: 8px;">{warranty['expiration_date']}</td>
</tr>
"""
text_body += "\nLog in to Warracker to view details:\n"
text_body += "http://localhost:8080\n\n"
text_body += "Manage your notification settings:\n"
text_body += "http://localhost:8080/settings.html\n"
html_body += f"""\
</tbody>
</table>
<p>Log in to <a href="http://localhost:8080">Warracker</a> to view details.</p>
<p>Manage your notification settings <a href="http://localhost:8080/settings.html">here</a>.</p>
</body>
</html>
"""
# Create a MIMEMultipart object for both text and HTML
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = os.environ.get('SMTP_USERNAME', 'notifications@warracker.com')
msg['To'] = user['email']
part1 = MIMEText(text_body, 'plain')
part2 = MIMEText(html_body, 'html')
msg.attach(part1)
msg.attach(part2)
return msg
# Create a lock for the notification function
notification_lock = threading.Lock()
# Track when notifications were last sent to each user
last_notification_sent = {}
def send_expiration_notifications(manual_trigger=False):
"""
Main function to send warranty expiration notifications.
Retrieves expiring warranties, groups them by user, and sends emails.
Args:
manual_trigger (bool): Whether this function was triggered manually (vs scheduled)
If True, it ignores notification frequency/time preferences
"""
# Use a lock to prevent concurrent executions
if not notification_lock.acquire(blocking=False):
logger.info("Notification job already running, skipping this execution")
return
try:
logger.info("Starting expiration notification process")
# If not manually triggered, check if notifications should be sent today based on preferences
if not manual_trigger:
conn = None
try:
conn = get_db_connection()
with conn.cursor() as cur:
# Get today's date and current time in UTC
utc_now = datetime.utcnow()
# Get user IDs that should receive notifications today
eligible_users_query = """
SELECT
u.id,
u.email,
u.first_name,
up.notification_time,
up.timezone,
up.notification_frequency
FROM users u
JOIN user_preferences up ON u.id = up.user_id
WHERE u.is_active = TRUE
AND up.email_notifications = TRUE
"""
cur.execute(eligible_users_query)
eligible_users = cur.fetchall()
if not eligible_users:
logger.info("No users are eligible for notifications")
return
# Check if we should send notifications based on time and timezone
users_for_current_time = []
for user in eligible_users:
user_id, email, first_name, notification_time, timezone, frequency = user
try:
# Convert UTC time to user's timezone
user_tz = pytz_timezone(timezone or 'UTC')
user_local_time = utc_now.replace(tzinfo=pytz.UTC).astimezone(user_tz)
# Check if notification should be sent based on frequency
should_send = False
if frequency == 'daily':
should_send = True
elif frequency == 'weekly' and user_local_time.weekday() == 0: # Monday
should_send = True
elif frequency == 'monthly' and user_local_time.day == 1:
should_send = True
if should_send:
# Parse notification time
time_hour, time_minute = map(int, notification_time.split(':'))
# Get current hour and minute in user's timezone
current_hour = user_local_time.hour
current_minute = user_local_time.minute
# Calculate minutes difference
user_minutes = time_hour * 60 + time_minute
current_minutes = current_hour * 60 + current_minute
# Calculate exact time difference (can be negative if current time is before notification time)
time_diff = current_minutes - user_minutes
# For notifications, we want to send:
# 1. If current time is 0-2 minutes after scheduled time (11:27 → send between 11:27-11:29)
# 2. Or, if the next scheduler run would miss the time (scheduler runs every 5 min)
# For example, if it's 11:24 and notification is set for 11:27, next run is 11:29 so we should send now
send_window = time_diff >= 0 and time_diff <= 2 # 0-2 minutes after scheduled time
next_miss_window = time_diff < 0 and time_diff >= -3 # 1-3 minutes before scheduled time
logger.info(f"Time check for {email}: scheduled={time_hour}:{time_minute:02d}, " +
f"current={current_hour}:{current_minute:02d}, diff={time_diff} min, " +
f"send_window={send_window}, next_miss_window={next_miss_window}")
if send_window or next_miss_window:
# Check if we've already sent a notification to this user recently
now_timestamp = int(utc_now.timestamp())
if email in last_notification_sent:
last_sent = last_notification_sent[email]
# Only send if it's been more than 10 minutes since the last notification
# (longer than the 5-minute scheduler interval)
if now_timestamp - last_sent > 600:
users_for_current_time.append(user_id)
logger.info(f"User {email} eligible for notification at their local time {notification_time} ({timezone})")
else:
logger.info(f"Skipping notification for {email} - already sent within the last 10 minutes (last sent: {datetime.fromtimestamp(last_sent).strftime('%Y-%m-%d %H:%M:%S')})")
else:
users_for_current_time.append(user_id)
logger.info(f"User {email} eligible for notification at their local time {notification_time} ({timezone})")
except Exception as e:
logger.error(f"Error processing timezone for user {email}: {e}")
continue
if not users_for_current_time:
logger.info("No users are scheduled for notifications at their local time")
return
logger.info(f"Found {len(users_for_current_time)} users eligible for notifications now")
except Exception as e:
logger.error(f"Error determining notification eligibility: {e}")
return
finally:
if conn:
release_db_connection(conn)
expiring_warranties = get_expiring_warranties()
if not expiring_warranties:
logger.info("No expiring warranties found.")
return
# Group warranties by user
users_warranties = {}
for warranty in expiring_warranties:
email = warranty['email']
if email not in users_warranties:
users_warranties[email] = {
'first_name': warranty['first_name'],
'warranties': []
}
users_warranties[email]['warranties'].append(warranty)
# Get SMTP settings from environment variables with fallbacks
smtp_host = os.environ.get('SMTP_HOST', 'localhost')
smtp_port = int(os.environ.get('SMTP_PORT', '1025'))
smtp_username = os.environ.get('SMTP_USERNAME', 'notifications@warracker.com')
smtp_password = os.environ.get('SMTP_PASSWORD', '')
# Connect to SMTP server
try:
# Use SMTP_SSL for port 465, regular SMTP for other ports
if smtp_port == 465:
import smtplib
logger.info(f"Using SMTP_SSL connection for port 465")
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
else:
server = smtplib.SMTP(smtp_host, smtp_port)
# Start TLS for security if not local debug server and not using SSL
if smtp_host != 'localhost':
server.starttls()
# Login if credentials are provided
if smtp_username and smtp_password:
logger.info(f"Logging in with username: {smtp_username}")
server.login(smtp_username, smtp_password)
# Send emails to each user
utc_now = datetime.utcnow()
timestamp = int(utc_now.timestamp())
emails_sent = 0
for email, user_data in users_warranties.items():
# For manual triggers, check if we've sent recently
if manual_trigger and email in last_notification_sent:
last_sent = last_notification_sent[email]
# Only allow manual trigger to bypass the time limit if it's been more than 2 minutes
# This prevents accidental double-clicks by admins
if timestamp - last_sent < 120:
logger.info(f"Manual trigger: Skipping notification for {email} - already sent within the last 2 minutes")
continue
msg = format_expiration_email(
{'first_name': user_data['first_name'], 'email': email},
user_data['warranties']
)
try:
server.sendmail(smtp_username, email, msg.as_string())
# Record timestamp when we sent the notification
last_notification_sent[email] = timestamp
emails_sent += 1
logger.info(f"Expiration notification email sent to {email} for {len(user_data['warranties'])} warranties at {datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
logger.error(f"Error sending email to {email}: {e}")
logger.info(f"Email notification process completed. Sent {emails_sent} emails out of {len(users_warranties)} eligible users.")
# Close the server connection
server.quit()
except Exception as e:
logger.error(f"Error connecting to SMTP server: {e}")
logger.error(f"SMTP details - Host: {smtp_host}, Port: {smtp_port}, Username: {smtp_username}")
except Exception as e:
logger.error(f"Error in send_expiration_notifications: {e}")
finally:
notification_lock.release()
# Initialize scheduler
scheduler = BackgroundScheduler(
job_defaults={
'coalesce': True, # Combine multiple executions into one
'max_instances': 1, # Only allow one instance of the job to run at a time
'misfire_grace_time': 300 # Allow 5 minutes for a misfired job (increased from 60 seconds)
}
)
# Helper to check if this is the main process that should run the scheduler
def should_run_scheduler():
# For gunicorn
if os.environ.get('GUNICORN_WORKER_PROCESS_NAME') == 'worker-0' or \
(os.environ.get('GUNICORN_WORKER_CLASS') and int(os.environ.get('GUNICORN_WORKER_ID', '0')) == 0):
logger.info("Starting scheduler in Gunicorn worker 0")
return True
# For development server
elif __name__ == '__main__':
logger.info("Starting scheduler in development server")
return True
# Default case - don't start scheduler
return False
# Only start the scheduler in the main process, not in workers
if should_run_scheduler():
# Check for scheduled notifications every 2 minutes for more precise timing
scheduler.add_job(func=send_expiration_notifications, trigger="interval", minutes=2, id='notification_job')
scheduler.start()
logger.info("Email notification scheduler started - checking every 2 minutes")
# Add a shutdown hook
atexit.register(lambda: scheduler.shutdown())
# Initialize the database when the application starts
if __name__ != '__main__': # Only for production
try:
init_db()
logger.info("Database initialized during application startup")
except Exception as e:
logger.error(f"Database initialization error during startup: {e}")
if __name__ == '__main__':
try:
app.run(debug=os.environ.get('FLASK_DEBUG', '0') == '1', host='0.0.0.0')
except Exception as e:
logger.error(f"Application startup error: {e}")
# Move this code before the if __name__ == '__main__' block
@app.route('/api/admin/send-notifications', methods=['POST'])
@admin_required
def trigger_notifications():
"""
Admin-only endpoint to manually trigger warranty expiration notifications.
Useful for testing and for sending notifications outside the scheduled time.
"""
try:
logger.info(f"Manual notification trigger requested by admin user {request.user['id']}")
send_expiration_notifications(manual_trigger=True)
return jsonify({'message': 'Notifications triggered successfully'}), 200
except Exception as e:
error_msg = f"Error triggering notifications: {str(e)}"
logger.error(error_msg)
return jsonify({'message': 'Failed to trigger notifications', 'error': error_msg}), 500
@app.route('/api/timezones', methods=['GET'])
def get_timezones():
"""Get list of all available timezones"""
try:
# Get all timezones from pytz
all_timezones = pytz.all_timezones
# Group timezones by region
timezone_groups = {}
for tz in all_timezones:
# Split timezone into parts (e.g., 'America/New_York' -> ['America', 'New_York'])
parts = tz.split('/')
if len(parts) > 1:
region = parts[0]
city = '/'.join(parts[1:]).replace('_', ' ')
if region not in timezone_groups:
timezone_groups[region] = []
timezone_groups[region].append({
'value': tz,
'label': f"{city} ({tz})"
})
else:
# Handle special cases like 'UTC'
if 'Other' not in timezone_groups:
timezone_groups['Other'] = []
timezone_groups['Other'].append({
'value': tz,
'label': tz
})
# Convert to list of groups
timezone_list = [
{
'region': region,
'timezones': sorted(timezones, key=lambda x: x['label'])
}
for region, timezones in sorted(timezone_groups.items())
]
return jsonify(timezone_list), 200
except Exception as e:
logger.error(f"Error getting timezones: {e}")
return jsonify({'message': 'Failed to get timezones'}), 500
@app.route('/api/debug/warranty/<int:warranty_id>', methods=['GET'])
@token_required
def debug_warranty(warranty_id):
conn = None
try:
user_id = request.user['id']
is_admin = request.user['is_admin']
conn = get_db_connection()
with conn.cursor() as cur:
# If admin, can see any warranty, otherwise only user's warranties
if is_admin:
cur.execute('SELECT * FROM warranties WHERE id = %s', (warranty_id,))
else:
cur.execute('SELECT * FROM warranties WHERE id = %s AND user_id = %s',
(warranty_id, user_id))
warranty = cur.fetchone()
if not warranty:
return jsonify({"error": "Warranty not found or you don't have permission to view it"}), 404
columns = [desc[0] for desc in cur.description]
warranty_dict = dict(zip(columns, warranty))
# Convert date objects to ISO format strings for JSON serialization
for key, value in warranty_dict.items():
if isinstance(value, (datetime, date)):
warranty_dict[key] = value.isoformat()
# Convert Decimal objects to float for JSON serialization
elif isinstance(value, Decimal):
warranty_dict[key] = float(value)
# Get serial numbers for this warranty
cur.execute('SELECT id, serial_number FROM serial_numbers WHERE warranty_id = %s', (warranty_id,))
serial_numbers = [{
'id': row[0],
'serial_number': row[1]
} for row in cur.fetchall()]
warranty_dict['serial_numbers'] = serial_numbers
return jsonify({
'warranty': warranty_dict,
'columns': columns
})
except Exception as e:
logger.error(f"Error retrieving warranty debug info: {e}")
return jsonify({"error": f"Failed to retrieve warranty debug info: {str(e)}"}), 500
finally:
if conn:
release_db_connection(conn)
@app.route('/api/debug/file-check', methods=['GET'])
@token_required
def debug_file_check():
if not request.user.get('is_admin', False):
return jsonify({"error": "Admin access required"}), 403
filepath = request.args.get('path')
if not filepath:
return jsonify({"error": "No file path provided"}), 400
# Check if this is just a filename or a path
if '/' not in filepath:
filepath = os.path.join('uploads', filepath)
# Prepend /data if it doesn't start with it
if not filepath.startswith('/data'):
full_path = os.path.join('/data', filepath)
else:
full_path = filepath
result = {
'requested_path': filepath,
'full_path': full_path,
'exists': os.path.exists(full_path),
'is_file': os.path.isfile(full_path) if os.path.exists(full_path) else False,
'size_bytes': os.path.getsize(full_path) if os.path.exists(full_path) and os.path.isfile(full_path) else None,
'last_modified': None
}
if result['exists'] and result['is_file']:
try:
stat_info = os.stat(full_path)
result['last_modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat()
result['size_human'] = f"{result['size_bytes'] / 1024:.2f} KB" if result['size_bytes'] else None
except Exception as e:
result['error'] = str(e)
return jsonify(result)