mirror of
https://github.com/DRYTRIX/TimeTracker.git
synced 2026-05-18 04:08:48 -05:00
5df748b30e
- Add POST /api/v1/time-entries/import-csv and POST /api/v1/time-entries/bulk. - Support Idempotency-Key on POST /api/v1/time-entries with 24h replay per token. - Enforce per-token minute/hour limits (Redis when available, local fallback otherwise). - Extract bulk and CSV import logic into dedicated services; trim legacy api route surface.
254 lines
8.8 KiB
Python
254 lines
8.8 KiB
Python
"""API Token Authentication utilities for REST API"""
|
|
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
|
|
from flask import current_app, g, jsonify, request
|
|
|
|
from app import db
|
|
from app.models import ApiToken, User
|
|
|
|
|
|
def extract_token_from_request():
|
|
"""Extract API token from request headers
|
|
|
|
Supports multiple formats:
|
|
- Authorization: Bearer <token>
|
|
- Authorization: Token <token>
|
|
- X-API-Key: <token>
|
|
|
|
Returns:
|
|
str or None: The token if found
|
|
"""
|
|
# Check Authorization header
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if auth_header:
|
|
parts = auth_header.split()
|
|
if len(parts) == 2:
|
|
scheme = parts[0].lower()
|
|
if scheme in ("bearer", "token"):
|
|
return parts[1]
|
|
|
|
# Check X-API-Key header
|
|
api_key = request.headers.get("X-API-Key")
|
|
if api_key:
|
|
return api_key
|
|
|
|
return None
|
|
|
|
|
|
def authenticate_token(token_string, record_usage: bool = True):
|
|
"""Authenticate an API token and return the associated user
|
|
|
|
Args:
|
|
token_string: The plain token string
|
|
record_usage: If True, increment usage counters (commit). Set False when rate limit runs first.
|
|
|
|
Returns:
|
|
tuple: (User, ApiToken, error_message) if invalid, (User, ApiToken, None) if valid
|
|
"""
|
|
if not token_string or not token_string.startswith("tt_"):
|
|
return None, None, "Invalid token format"
|
|
|
|
# Get token hash
|
|
token_hash = ApiToken.hash_token(token_string)
|
|
|
|
# Find token in database
|
|
api_token = ApiToken.query.filter_by(token_hash=token_hash).first()
|
|
|
|
if not api_token:
|
|
return None, None, "Token not found"
|
|
|
|
# Check if token is active
|
|
if not api_token.is_active:
|
|
return None, None, "Token has been revoked"
|
|
|
|
# Check expiration
|
|
if api_token.expires_at and api_token.expires_at < datetime.utcnow():
|
|
return None, None, "Token has expired"
|
|
|
|
# Check IP whitelist if configured
|
|
if api_token.ip_whitelist:
|
|
client_ip = request.remote_addr
|
|
allowed_ips = [ip.strip() for ip in api_token.ip_whitelist.split(",") if ip.strip()]
|
|
|
|
# Simple IP matching (can be enhanced with CIDR support)
|
|
if client_ip not in allowed_ips:
|
|
# Check CIDR blocks if any
|
|
from ipaddress import ip_address, ip_network
|
|
|
|
ip_allowed = False
|
|
for allowed in allowed_ips:
|
|
try:
|
|
if "/" in allowed:
|
|
# CIDR block
|
|
if ip_address(client_ip) in ip_network(allowed, strict=False):
|
|
ip_allowed = True
|
|
break
|
|
elif client_ip == allowed:
|
|
ip_allowed = True
|
|
break
|
|
except ValueError:
|
|
# Invalid IP format, skip
|
|
continue
|
|
|
|
if not ip_allowed:
|
|
current_app.logger.warning(f"API token {api_token.token_prefix}... access denied from IP {client_ip}")
|
|
return None, None, "Access denied from this IP address"
|
|
|
|
# Get associated user
|
|
user = User.query.get(api_token.user_id)
|
|
if not user or not user.is_active:
|
|
return None, None, "User account is inactive"
|
|
|
|
# Record usage
|
|
try:
|
|
api_token.record_usage(request.remote_addr)
|
|
except Exception as e:
|
|
current_app.logger.warning(f"Failed to record API token usage: {e}")
|
|
|
|
return user, api_token, None
|
|
|
|
|
|
def require_api_token(required_scope=None):
|
|
"""Decorator to require API token authentication
|
|
|
|
Args:
|
|
required_scope: Optional scope(s) required for this endpoint. Either a single
|
|
string (e.g. 'read:projects') or a tuple/list of strings (any one of,
|
|
e.g. ('read:inventory', 'read:projects') for backward compatibility).
|
|
|
|
Usage:
|
|
@require_api_token('read:projects')
|
|
def get_projects():
|
|
...
|
|
|
|
@require_api_token(('read:inventory', 'read:projects'))
|
|
def list_stock_items_api():
|
|
...
|
|
"""
|
|
allowed_scopes = (required_scope,) if isinstance(required_scope, str) else (required_scope or ())
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Extract token from request
|
|
token_string = extract_token_from_request()
|
|
|
|
if not token_string:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Authentication required",
|
|
"message": "API token must be provided in Authorization header or X-API-Key header",
|
|
"error_code": "unauthorized",
|
|
}
|
|
),
|
|
401,
|
|
)
|
|
|
|
# Authenticate token (defer usage recording until after rate limit)
|
|
user, api_token, error_msg = authenticate_token(token_string, record_usage=False)
|
|
|
|
if not user or not api_token:
|
|
message = error_msg or "The provided API token is invalid or expired"
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Invalid token",
|
|
"message": message,
|
|
"error_code": "unauthorized",
|
|
}
|
|
),
|
|
401,
|
|
)
|
|
|
|
# Check scope if required (single scope or any of multiple)
|
|
if allowed_scopes:
|
|
has_any = any(api_token.has_scope(s) for s in allowed_scopes)
|
|
if not has_any:
|
|
required_display = allowed_scopes[0] if len(allowed_scopes) == 1 else ", ".join(allowed_scopes)
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Insufficient permissions",
|
|
"message": f'This endpoint requires one of: "{required_display}"',
|
|
"error_code": "forbidden",
|
|
"required_scope": required_display,
|
|
"available_scopes": api_token.scopes.split(",") if api_token.scopes else [],
|
|
}
|
|
),
|
|
403,
|
|
)
|
|
|
|
# Per-token rate limit (minute + hour)
|
|
try:
|
|
from app.utils.api_rate_limit import consume_api_token_rate_limit
|
|
|
|
allowed, rl_info = consume_api_token_rate_limit(api_token.id)
|
|
if not allowed:
|
|
retry_after = int(rl_info.get("retry_after_seconds") or 60)
|
|
resp = jsonify(
|
|
{
|
|
"error": "Rate limit exceeded",
|
|
"message": "Too many requests for this API token. Try again later.",
|
|
"error_code": "rate_limited",
|
|
"limit_per_minute": rl_info.get("limit_minute"),
|
|
"limit_per_hour": rl_info.get("limit_hour"),
|
|
"remaining_per_minute": rl_info.get("remaining_minute"),
|
|
"remaining_per_hour": rl_info.get("remaining_hour"),
|
|
}
|
|
)
|
|
resp.status_code = 429
|
|
resp.headers["Retry-After"] = str(retry_after)
|
|
return resp
|
|
except Exception as e:
|
|
current_app.logger.warning("API token rate limit check failed (allowing request): %s", e)
|
|
|
|
try:
|
|
api_token.record_usage(request.remote_addr)
|
|
except Exception as e:
|
|
current_app.logger.warning(f"Failed to record API token usage: {e}")
|
|
|
|
# Store in request context
|
|
g.api_user = user
|
|
g.api_token = api_token
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|
|
|
|
|
|
def optional_api_token():
|
|
"""Decorator that allows both session-based and token-based authentication
|
|
|
|
Useful for endpoints that can be accessed via web UI or API
|
|
|
|
Usage:
|
|
@optional_api_token()
|
|
@login_required # Will be satisfied by API token if present
|
|
def get_data():
|
|
# Access user via current_user (session) or g.api_user (token)
|
|
pass
|
|
"""
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Try to extract and authenticate token
|
|
token_string = extract_token_from_request()
|
|
|
|
if token_string:
|
|
user, api_token, error_msg = authenticate_token(token_string)
|
|
if user and api_token:
|
|
g.api_user = user
|
|
g.api_token = api_token
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|