mirror of
https://github.com/DRYTRIX/TimeTracker.git
synced 2026-05-19 04:40:32 -05:00
Merge pull request #625 from MacJediWizard/upstream-fix/ci-test-fixes-batch-1
fix(tests, api): four small CI failures from v5.5.6 test-fixture tightening
This commit is contained in:
@@ -6,7 +6,7 @@ Sub-blueprint for /api/v1/time-entries and /api/v1/timer/*.
|
||||
from flask import Blueprint, g, jsonify, request
|
||||
from marshmallow import ValidationError
|
||||
|
||||
from app.routes.api_v1_common import _parse_date_range, paginate_query, parse_datetime
|
||||
from app.routes.api_v1_common import _parse_date_range, paginate_query
|
||||
from app.schemas.time_entry_schema import TimeEntryCreateSchema, TimeEntryUpdateSchema
|
||||
from app.utils.api_auth import require_api_token
|
||||
from app.utils.api_responses import (
|
||||
@@ -16,7 +16,9 @@ from app.utils.api_responses import (
|
||||
validation_error_response,
|
||||
)
|
||||
|
||||
api_v1_time_entries_bp = Blueprint("api_v1_time_entries", __name__, url_prefix="/api/v1")
|
||||
api_v1_time_entries_bp = Blueprint(
|
||||
"api_v1_time_entries", __name__, url_prefix="/api/v1"
|
||||
)
|
||||
|
||||
|
||||
@api_v1_time_entries_bp.route("/time-entries", methods=["GET"])
|
||||
@@ -50,7 +52,9 @@ def list_time_entries():
|
||||
per_page = request.args.get("per_page", 50, type=int)
|
||||
|
||||
query = TimeEntry.query.options(
|
||||
joinedload(TimeEntry.project), joinedload(TimeEntry.user), joinedload(TimeEntry.task)
|
||||
joinedload(TimeEntry.project),
|
||||
joinedload(TimeEntry.user),
|
||||
joinedload(TimeEntry.task),
|
||||
)
|
||||
if project_id:
|
||||
query = query.filter(TimeEntry.project_id == project_id)
|
||||
@@ -67,7 +71,12 @@ def list_time_entries():
|
||||
|
||||
query = query.order_by(TimeEntry.start_time.desc())
|
||||
result = paginate_query(query, page, per_page)
|
||||
return jsonify({"time_entries": [e.to_dict() for e in result["items"]], "pagination": result["pagination"]})
|
||||
return jsonify(
|
||||
{
|
||||
"time_entries": [e.to_dict() for e in result["items"]],
|
||||
"pagination": result["pagination"],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@api_v1_time_entries_bp.route("/time-entries/<int:entry_id>", methods=["GET"])
|
||||
@@ -79,7 +88,11 @@ def get_time_entry(entry_id):
|
||||
from app.models import TimeEntry
|
||||
|
||||
entry = (
|
||||
TimeEntry.query.options(joinedload(TimeEntry.project), joinedload(TimeEntry.user), joinedload(TimeEntry.task))
|
||||
TimeEntry.query.options(
|
||||
joinedload(TimeEntry.project),
|
||||
joinedload(TimeEntry.user),
|
||||
joinedload(TimeEntry.task),
|
||||
)
|
||||
.filter_by(id=entry_id)
|
||||
.first_or_404()
|
||||
)
|
||||
@@ -92,7 +105,9 @@ def get_time_entry(entry_id):
|
||||
@require_api_token("write:time_entries")
|
||||
def import_time_entries_csv():
|
||||
"""Import time entries from CSV (header row required)."""
|
||||
from app.services.time_entry_csv_import_service import import_time_entries_from_csv_text
|
||||
from app.services.time_entry_csv_import_service import (
|
||||
import_time_entries_from_csv_text,
|
||||
)
|
||||
|
||||
csv_text = ""
|
||||
if request.files and request.files.get("file"):
|
||||
@@ -104,7 +119,9 @@ def import_time_entries_csv():
|
||||
else:
|
||||
csv_text = request.get_data(as_text=True) or ""
|
||||
|
||||
result, status = import_time_entries_from_csv_text(csv_text, user_id=g.api_user.id, is_admin=g.api_user.is_admin)
|
||||
result, status = import_time_entries_from_csv_text(
|
||||
csv_text, user_id=g.api_user.id, is_admin=g.api_user.is_admin
|
||||
)
|
||||
return jsonify(result), status
|
||||
|
||||
|
||||
@@ -128,11 +145,17 @@ def bulk_time_entries():
|
||||
try:
|
||||
ids.append(int(eid))
|
||||
except (TypeError, ValueError):
|
||||
return validation_error_response(errors={"entry_ids": ["All entry ids must be integers"]})
|
||||
result = apply_bulk_time_entry_actions(ids, action, value, user_id=g.api_user.id, is_admin=g.api_user.is_admin)
|
||||
return validation_error_response(
|
||||
errors={"entry_ids": ["All entry ids must be integers"]}
|
||||
)
|
||||
result = apply_bulk_time_entry_actions(
|
||||
ids, action, value, user_id=g.api_user.id, is_admin=g.api_user.is_admin
|
||||
)
|
||||
if not result.get("success"):
|
||||
code = result.get("http_status", 400)
|
||||
return error_response(result.get("error", "Bulk operation failed"), status_code=code)
|
||||
return error_response(
|
||||
result.get("error", "Bulk operation failed"), status_code=code
|
||||
)
|
||||
return jsonify({"success": True, "affected": result.get("affected", 0)})
|
||||
|
||||
|
||||
@@ -151,7 +174,9 @@ def create_time_entry():
|
||||
|
||||
idem_key = normalize_idempotency_key(request.headers.get("Idempotency-Key"))
|
||||
if idem_key:
|
||||
existing = lookup_idempotent_response(g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key)
|
||||
existing = lookup_idempotent_response(
|
||||
g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key
|
||||
)
|
||||
if existing:
|
||||
status_code, body_json = existing
|
||||
return replay_response(status_code, body_json)
|
||||
@@ -193,9 +218,15 @@ def create_time_entry():
|
||||
from app.models import Activity
|
||||
from app.utils.audit import get_request_info
|
||||
|
||||
entity_name = entry.project.name if entry.project else (entry.client.name if entry.client else "Unknown")
|
||||
entity_name = (
|
||||
entry.project.name
|
||||
if entry.project
|
||||
else (entry.client.name if entry.client else "Unknown")
|
||||
)
|
||||
task_name = entry.task.name if entry.task else None
|
||||
duration_formatted = entry.duration_formatted if hasattr(entry, "duration_formatted") else "0:00"
|
||||
duration_formatted = (
|
||||
entry.duration_formatted if hasattr(entry, "duration_formatted") else "0:00"
|
||||
)
|
||||
ip_address, user_agent, _ = get_request_info()
|
||||
Activity.log(
|
||||
user_id=g.api_user.id,
|
||||
@@ -211,13 +242,18 @@ def create_time_entry():
|
||||
"client_name": entry.client.name if entry.client else None,
|
||||
"task_name": task_name,
|
||||
"duration_formatted": duration_formatted,
|
||||
"duration_hours": entry.duration_hours if hasattr(entry, "duration_hours") else None,
|
||||
"duration_hours": entry.duration_hours
|
||||
if hasattr(entry, "duration_hours")
|
||||
else None,
|
||||
},
|
||||
ip_address=ip_address,
|
||||
user_agent=user_agent,
|
||||
)
|
||||
|
||||
payload = {"message": "Time entry created successfully", "time_entry": result["entry"].to_dict()}
|
||||
payload = {
|
||||
"message": "Time entry created successfully",
|
||||
"time_entry": result["entry"].to_dict(),
|
||||
}
|
||||
resp = jsonify(payload)
|
||||
resp.status_code = 201
|
||||
if idem_key:
|
||||
@@ -226,10 +262,14 @@ def create_time_entry():
|
||||
from app import db
|
||||
|
||||
try:
|
||||
store_idempotent_response(g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key, 201, payload)
|
||||
store_idempotent_response(
|
||||
g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key, 201, payload
|
||||
)
|
||||
except IntegrityError:
|
||||
db.session.rollback()
|
||||
existing = lookup_idempotent_response(g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key)
|
||||
existing = lookup_idempotent_response(
|
||||
g.api_token.id, SCOPE_POST_TIME_ENTRY, idem_key
|
||||
)
|
||||
if existing:
|
||||
status_code, body_json = existing
|
||||
return replay_response(status_code, body_json)
|
||||
@@ -286,7 +326,11 @@ def update_time_entry(entry_id):
|
||||
from app.models import Activity
|
||||
from app.utils.audit import get_request_info
|
||||
|
||||
entity_name = entry.project.name if entry.project else (entry.client.name if entry.client else "Unknown")
|
||||
entity_name = (
|
||||
entry.project.name
|
||||
if entry.project
|
||||
else (entry.client.name if entry.client else "Unknown")
|
||||
)
|
||||
task_name = entry.task.name if entry.task else None
|
||||
ip_address, user_agent, _ = get_request_info()
|
||||
Activity.log(
|
||||
@@ -295,7 +339,8 @@ def update_time_entry(entry_id):
|
||||
entity_type="time_entry",
|
||||
entity_id=entry.id,
|
||||
entity_name=f"{entity_name}" + (f" - {task_name}" if task_name else ""),
|
||||
description=f"Updated time entry for {entity_name}" + (f" - {task_name}" if task_name else ""),
|
||||
description=f"Updated time entry for {entity_name}"
|
||||
+ (f" - {task_name}" if task_name else ""),
|
||||
extra_data={
|
||||
"project_name": entry.project.name if entry.project else None,
|
||||
"client_name": entry.client.name if entry.client else None,
|
||||
@@ -305,7 +350,12 @@ def update_time_entry(entry_id):
|
||||
user_agent=user_agent,
|
||||
)
|
||||
|
||||
return jsonify({"message": "Time entry updated successfully", "time_entry": result["entry"].to_dict()})
|
||||
return jsonify(
|
||||
{
|
||||
"message": "Time entry updated successfully",
|
||||
"time_entry": result["entry"].to_dict(),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@api_v1_time_entries_bp.route("/time-entries/<int:entry_id>", methods=["DELETE"])
|
||||
@@ -314,7 +364,8 @@ def delete_time_entry(entry_id):
|
||||
"""Delete a time entry."""
|
||||
from app.services import TimeTrackingService
|
||||
|
||||
data = request.get_json() or {}
|
||||
# Optional body — tolerate DELETE with no Content-Type set
|
||||
data = request.get_json(silent=True) or {}
|
||||
reason = data.get("reason")
|
||||
time_tracking_service = TimeTrackingService()
|
||||
result = time_tracking_service.delete_entry(
|
||||
@@ -379,7 +430,9 @@ def start_timer():
|
||||
result.get("message", "Could not start timer"),
|
||||
status_code=400,
|
||||
)
|
||||
return jsonify({"message": "Timer started successfully", "timer": result["timer"].to_dict()}), 201
|
||||
return jsonify(
|
||||
{"message": "Timer started successfully", "timer": result["timer"].to_dict()}
|
||||
), 201
|
||||
|
||||
|
||||
@api_v1_time_entries_bp.route("/timer/pause", methods=["POST"])
|
||||
@@ -413,7 +466,9 @@ def resume_timer():
|
||||
error_code=result.get("error", "resume_failed"),
|
||||
status_code=400,
|
||||
)
|
||||
return jsonify({"message": "Timer resumed", "time_entry": result["entry"].to_dict()})
|
||||
return jsonify(
|
||||
{"message": "Timer resumed", "time_entry": result["entry"].to_dict()}
|
||||
)
|
||||
|
||||
|
||||
@api_v1_time_entries_bp.route("/timer/stop", methods=["POST"])
|
||||
@@ -430,11 +485,18 @@ def stop_timer():
|
||||
status_code=400,
|
||||
)
|
||||
time_tracking_service = TimeTrackingService()
|
||||
result = time_tracking_service.stop_timer(user_id=g.api_user.id, entry_id=active_timer.id)
|
||||
result = time_tracking_service.stop_timer(
|
||||
user_id=g.api_user.id, entry_id=active_timer.id
|
||||
)
|
||||
if not result.get("success"):
|
||||
return error_response(
|
||||
result.get("message", "Could not stop timer"),
|
||||
error_code=result.get("error", "stop_failed"),
|
||||
status_code=400,
|
||||
)
|
||||
return jsonify({"message": "Timer stopped successfully", "time_entry": result["entry"].to_dict()})
|
||||
return jsonify(
|
||||
{
|
||||
"message": "Timer stopped successfully",
|
||||
"time_entry": result["entry"].to_dict(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -13,7 +13,9 @@ from flask import url_for
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.routes
|
||||
def test_manual_entry_shows_single_client_prefilled(authenticated_client, app, user, test_client):
|
||||
def test_manual_entry_shows_single_client_prefilled(
|
||||
authenticated_client, app, user, test_client
|
||||
):
|
||||
"""When only one client exists, manual entry form shows pre-filled grayed-out client."""
|
||||
with app.app_context():
|
||||
# Ensure exactly one active client (test_client from fixture)
|
||||
@@ -35,15 +37,17 @@ def test_manual_entry_shows_single_client_prefilled(authenticated_client, app, u
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.routes
|
||||
def test_manual_entry_shows_select_when_multiple_clients(authenticated_client, app, user, test_client):
|
||||
def test_manual_entry_shows_select_when_multiple_clients(
|
||||
authenticated_client, app, user, test_client
|
||||
):
|
||||
"""When multiple clients exist, manual entry form shows normal client select."""
|
||||
with app.app_context():
|
||||
# Add a second client
|
||||
second = Client(
|
||||
name="Second Client",
|
||||
email="second@example.com",
|
||||
status="active",
|
||||
)
|
||||
second.status = "active"
|
||||
db.session.add(second)
|
||||
db.session.commit()
|
||||
|
||||
@@ -55,6 +59,6 @@ def test_manual_entry_shows_select_when_multiple_clients(authenticated_client, a
|
||||
html = response.get_data(as_text=True)
|
||||
|
||||
# Should have normal select, not single-client hidden + disabled
|
||||
assert '<select' in html
|
||||
assert "<select" in html
|
||||
assert 'id="client_id"' in html or 'name="client_id"' in html
|
||||
assert "Select a client" in html or "client" in html.lower()
|
||||
|
||||
@@ -8,7 +8,7 @@ pytestmark = [pytest.mark.api, pytest.mark.integration]
|
||||
|
||||
from datetime import date
|
||||
from decimal import Decimal
|
||||
from app.models import Expense, Project, ApiToken
|
||||
from app.models import Expense, ApiToken, User
|
||||
|
||||
|
||||
class TestAPIExpensesComplete:
|
||||
@@ -18,7 +18,9 @@ class TestAPIExpensesComplete:
|
||||
def api_token(self, app, user):
|
||||
"""Create an API token for testing"""
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=user.id, name="Test API Token", scopes="read:expenses,write:expenses"
|
||||
user_id=user.id,
|
||||
name="Test API Token",
|
||||
scopes="read:expenses,write:expenses",
|
||||
)
|
||||
from app import db
|
||||
|
||||
@@ -34,7 +36,9 @@ class TestAPIExpensesComplete:
|
||||
test_client.environ_base["HTTP_AUTHORIZATION"] = f"Bearer {plain_token}"
|
||||
return test_client
|
||||
|
||||
def test_list_expenses_with_filters(self, app, client_with_token, user, project, expense):
|
||||
def test_list_expenses_with_filters(
|
||||
self, app, client_with_token, user, project, expense
|
||||
):
|
||||
"""Test list_expenses with various filters"""
|
||||
# Filter by project
|
||||
response = client_with_token.get(f"/api/v1/expenses?project_id={project.id}")
|
||||
@@ -109,7 +113,7 @@ class TestAPIExpensesComplete:
|
||||
|
||||
def test_expense_permissions(self, app, user, project):
|
||||
"""Test expense access permissions"""
|
||||
from app.models import Expense, ApiToken
|
||||
from app.models import ApiToken
|
||||
from app import db
|
||||
|
||||
# Create expense for another user
|
||||
@@ -127,7 +131,9 @@ class TestAPIExpensesComplete:
|
||||
|
||||
# Create token for first user
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=user.id, name="Test Token", scopes="read:expenses,write:expenses"
|
||||
user_id=user.id,
|
||||
name="Test Token",
|
||||
scopes="read:expenses,write:expenses",
|
||||
)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
@@ -7,10 +7,13 @@ import pytest
|
||||
pytestmark = [pytest.mark.unit, pytest.mark.utils]
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from flask import Flask, g
|
||||
from app.models import ApiToken, User
|
||||
from app.utils.api_auth import authenticate_token, require_api_token, extract_token_from_request
|
||||
from app.utils.api_auth import (
|
||||
authenticate_token,
|
||||
require_api_token,
|
||||
extract_token_from_request,
|
||||
)
|
||||
from app import db
|
||||
|
||||
|
||||
@@ -19,27 +22,28 @@ class TestExtractToken:
|
||||
|
||||
def test_extract_from_bearer_header(self):
|
||||
"""Test extracting token from Bearer Authorization header"""
|
||||
from flask import Flask, request
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
with app.test_request_context(headers={"Authorization": "Bearer tt_testtoken123"}):
|
||||
with app.test_request_context(
|
||||
headers={"Authorization": "Bearer tt_testtoken123"}
|
||||
):
|
||||
token = extract_token_from_request()
|
||||
assert token == "tt_testtoken123"
|
||||
|
||||
def test_extract_from_token_header(self):
|
||||
"""Test extracting token from Token Authorization header"""
|
||||
from flask import Flask, request
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
with app.test_request_context(headers={"Authorization": "Token tt_testtoken123"}):
|
||||
with app.test_request_context(
|
||||
headers={"Authorization": "Token tt_testtoken123"}
|
||||
):
|
||||
token = extract_token_from_request()
|
||||
assert token == "tt_testtoken123"
|
||||
|
||||
def test_extract_from_api_key_header(self):
|
||||
"""Test extracting token from X-API-Key header"""
|
||||
from flask import Flask, request
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@@ -49,7 +53,6 @@ class TestExtractToken:
|
||||
|
||||
def test_extract_none_when_missing(self):
|
||||
"""Test that None is returned when no token is present"""
|
||||
from flask import Flask, request
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@@ -64,7 +67,8 @@ class TestAuthenticateToken:
|
||||
@pytest.fixture
|
||||
def sample_user(self, app):
|
||||
"""Create a sample user for testing"""
|
||||
user = User(username="testuser", is_active=True)
|
||||
user = User(username="testuser")
|
||||
user.is_active = True
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
@@ -73,7 +77,10 @@ class TestAuthenticateToken:
|
||||
def sample_token(self, app, sample_user):
|
||||
"""Create a sample API token"""
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=sample_user.id, name="Test Token", scopes="read:projects", expires_days=30
|
||||
user_id=sample_user.id,
|
||||
name="Test Token",
|
||||
scopes="read:projects",
|
||||
expires_days=30,
|
||||
)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
@@ -83,7 +90,7 @@ class TestAuthenticateToken:
|
||||
"""Test authentication with valid token"""
|
||||
token, plain_token = sample_token
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is not None
|
||||
@@ -95,13 +102,15 @@ class TestAuthenticateToken:
|
||||
def test_authenticate_expired_token(self, app, sample_user):
|
||||
"""Test authentication with expired token"""
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=sample_user.id, name="Expired Token", expires_days=-1 # Expired
|
||||
user_id=sample_user.id,
|
||||
name="Expired Token",
|
||||
expires_days=-1, # Expired
|
||||
)
|
||||
token.expires_at = datetime.utcnow() - timedelta(days=1)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is None
|
||||
@@ -114,7 +123,7 @@ class TestAuthenticateToken:
|
||||
token.is_active = False
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is None
|
||||
@@ -130,7 +139,7 @@ class TestAuthenticateToken:
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is not None
|
||||
@@ -146,7 +155,7 @@ class TestAuthenticateToken:
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="10.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "10.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is None
|
||||
@@ -155,12 +164,14 @@ class TestAuthenticateToken:
|
||||
|
||||
def test_authenticate_with_cidr_block(self, app, sample_user):
|
||||
"""Test authentication with CIDR block in whitelist"""
|
||||
token, plain_token = ApiToken.create_token(user_id=sample_user.id, name="CIDR Token", scopes="read:projects")
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=sample_user.id, name="CIDR Token", scopes="read:projects"
|
||||
)
|
||||
token.ip_whitelist = "192.168.1.0/24"
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="192.168.1.100"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "192.168.1.100"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is not None
|
||||
@@ -169,7 +180,7 @@ class TestAuthenticateToken:
|
||||
|
||||
def test_authenticate_invalid_token_format(self, app):
|
||||
"""Test authentication with invalid token format"""
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token("invalid_token")
|
||||
|
||||
assert user is None
|
||||
@@ -180,7 +191,7 @@ class TestAuthenticateToken:
|
||||
"""Test authentication with non-existent token"""
|
||||
fake_token = "tt_" + "x" * 32
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(fake_token)
|
||||
|
||||
assert user is None
|
||||
@@ -193,7 +204,7 @@ class TestAuthenticateToken:
|
||||
token.user.is_active = False
|
||||
db.session.commit()
|
||||
|
||||
with app.test_request_context(remote_addr="127.0.0.1"):
|
||||
with app.test_request_context(environ_overrides={"REMOTE_ADDR": "127.0.0.1"}):
|
||||
user, api_token, error = authenticate_token(plain_token)
|
||||
|
||||
assert user is None
|
||||
@@ -220,12 +231,16 @@ class TestRequireApiToken:
|
||||
|
||||
return app
|
||||
|
||||
def test_protected_route_with_valid_token(self, app_with_routes, sample_user, sample_token):
|
||||
def test_protected_route_with_valid_token(
|
||||
self, app_with_routes, sample_user, sample_token
|
||||
):
|
||||
"""Test accessing protected route with valid token"""
|
||||
token, plain_token = sample_token
|
||||
|
||||
with app_with_routes.test_client() as client:
|
||||
response = client.get("/test/protected", headers={"Authorization": f"Bearer {plain_token}"})
|
||||
response = client.get(
|
||||
"/test/protected", headers={"Authorization": f"Bearer {plain_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -242,16 +257,22 @@ class TestRequireApiToken:
|
||||
assert "error" in data
|
||||
assert "Authentication required" in data["error"]
|
||||
|
||||
def test_protected_route_with_insufficient_scope(self, app_with_routes, sample_user):
|
||||
def test_protected_route_with_insufficient_scope(
|
||||
self, app_with_routes, sample_user
|
||||
):
|
||||
"""Test accessing protected route with insufficient scope"""
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=sample_user.id, name="Limited Token", scopes="read:time_entries" # Different scope
|
||||
user_id=sample_user.id,
|
||||
name="Limited Token",
|
||||
scopes="read:time_entries", # Different scope
|
||||
)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app_with_routes.test_client() as client:
|
||||
response = client.get("/test/protected", headers={"Authorization": f"Bearer {plain_token}"})
|
||||
response = client.get(
|
||||
"/test/protected", headers={"Authorization": f"Bearer {plain_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
data = response.get_json()
|
||||
@@ -261,13 +282,17 @@ class TestRequireApiToken:
|
||||
def test_protected_route_with_wildcard_scope(self, app_with_routes, sample_user):
|
||||
"""Test accessing protected route with wildcard scope"""
|
||||
token, plain_token = ApiToken.create_token(
|
||||
user_id=sample_user.id, name="Admin Token", scopes="read:*" # Wildcard scope
|
||||
user_id=sample_user.id,
|
||||
name="Admin Token",
|
||||
scopes="read:*", # Wildcard scope
|
||||
)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
with app_with_routes.test_client() as client:
|
||||
response = client.get("/test/protected", headers={"Authorization": f"Bearer {plain_token}"})
|
||||
response = client.get(
|
||||
"/test/protected", headers={"Authorization": f"Bearer {plain_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
|
||||
Reference in New Issue
Block a user