refactor(app): wire API, tasks, mileage, custom reports to blueprint registry

- Update api, api_docs, api_v1, custom_reports, mileage, tasks routes
- Update custom_report_service and time_tracking_service
- Export services in app/services/__init__.py
This commit is contained in:
Dries Peeters
2026-03-06 15:45:31 +01:00
parent 435e53957c
commit 2a283c3074
9 changed files with 874 additions and 24 deletions
+18 -2
View File
@@ -1429,12 +1429,28 @@ def get_entry(entry_id):
@api_bp.route("/api/users")
@login_required
def get_users():
"""Get active users (admin only)"""
"""Get active users (admin only). Uses a single aggregate query for total_hours to avoid N+1."""
if not current_user.is_admin:
return jsonify({"error": "Access denied"}), 403
users = User.query.filter_by(is_active=True).order_by(User.username).all()
return jsonify({"users": [user.to_dict() for user in users]})
if not users:
return jsonify({"users": []})
user_ids = [u.id for u in users]
rows = (
db.session.query(TimeEntry.user_id, db.func.sum(TimeEntry.duration_seconds))
.filter(
TimeEntry.user_id.in_(user_ids),
TimeEntry.end_time.isnot(None),
)
.group_by(TimeEntry.user_id)
.all()
)
total_hours_by_user = {uid: round((total_seconds or 0) / 3600, 2) for uid, total_seconds in rows}
return jsonify({
"users": [user.to_dict(total_hours_override=total_hours_by_user.get(user.id)) for user in users]
})
@api_bp.route("/api/stats")
+1 -1
View File
@@ -33,7 +33,7 @@ def openapi_spec():
"openapi": "3.0.0",
"info": {
"title": "TimeTracker REST API",
"version": "1.0.0",
"version": "4.20.9",
"description": """
# TimeTracker REST API
+591 -4
View File
@@ -1,6 +1,6 @@
"""REST API v1 - Comprehensive API endpoints with token authentication"""
from flask import Blueprint, jsonify, request, current_app, g
from flask import Blueprint, jsonify, request, current_app, g, Response
from app import db, limiter
from app.models import (
User,
@@ -57,7 +57,7 @@ from app.utils.api_responses import (
forbidden_response,
not_found_response,
)
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
from sqlalchemy import func, or_
from app.utils.timezone import get_app_timezone, parse_local_datetime, utc_to_local
from app.models.time_entry import local_now
@@ -230,6 +230,21 @@ def api_info():
"webhooks": "/api/v1/webhooks",
"users": "/api/v1/users",
"reports": "/api/v1/reports",
"timesheet_periods": "/api/v1/timesheet-periods",
"timesheet_policy": "/api/v1/timesheet-policy",
"time_off": {
"leave_types": "/api/v1/time-off/leave-types",
"requests": "/api/v1/time-off/requests",
"balances": "/api/v1/time-off/balances",
"holidays": "/api/v1/time-off/holidays",
},
"payroll_export": "/api/v1/exports/payroll",
"capacity_report": "/api/v1/reports/capacity",
"compliance_reports": {
"locked_periods": "/api/v1/reports/compliance/locked-periods",
"audit_events": "/api/v1/reports/compliance/audit-events",
},
"mileage_gps": "/api/v1/mileage/gps",
"search": "/api/v1/search",
"inventory": {
"items": "/api/v1/inventory/items",
@@ -286,12 +301,13 @@ def auth_login():
return jsonify({"error": "Invalid username or password"}), 401
scopes = "read:projects,read:tasks,read:time_entries,write:time_entries"
expiry_days = current_app.config.get("API_TOKEN_DEFAULT_EXPIRY_DAYS", 90)
api_token, plain_token = ApiToken.create_token(
user_id=user.id,
name=f"Mobile app - {user.username}",
description="Token issued via mobile/app login",
scopes=scopes,
expires_days=None,
expires_days=expiry_days if expiry_days else None,
)
db.session.add(api_token)
db.session.commit()
@@ -5383,8 +5399,26 @@ def list_users():
# Paginate
result = paginate_query(query)
items = result["items"]
if not items:
return jsonify({"users": [], "pagination": result["pagination"]})
return jsonify({"users": [u.to_dict() for u in result["items"]], "pagination": result["pagination"]})
# Single aggregate query for total_hours to avoid N+1
user_ids = [u.id for u in items]
rows = (
db.session.query(TimeEntry.user_id, db.func.sum(TimeEntry.duration_seconds))
.filter(
TimeEntry.user_id.in_(user_ids),
TimeEntry.end_time.isnot(None),
)
.group_by(TimeEntry.user_id)
.all()
)
total_hours_by_user = {uid: round((total_seconds or 0) / 3600, 2) for uid, total_seconds in rows}
return jsonify({
"users": [u.to_dict(total_hours_override=total_hours_by_user.get(u.id)) for u in items],
"pagination": result["pagination"],
})
# ==================== Webhooks ====================
@@ -6772,6 +6806,559 @@ def search():
return jsonify({"results": results, "query": query, "count": len(results)})
# ==================== Timesheet Governance ====================
def _is_api_approver(user) -> bool:
if user.is_admin:
return True
try:
from app.services.workforce_governance_service import WorkforceGovernanceService
policy = WorkforceGovernanceService().get_or_create_default_policy()
return user.id in policy.get_approver_ids()
except Exception:
return False
@api_v1_bp.route("/timesheet-periods", methods=["GET"])
@require_api_token("read:time_entries")
def list_timesheet_periods():
from app.services.workforce_governance_service import WorkforceGovernanceService
service = WorkforceGovernanceService()
user_id = request.args.get("user_id", type=int)
if not g.api_user.is_admin:
user_id = g.api_user.id
status = request.args.get("status")
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
periods = service.list_periods(user_id=user_id, status=status, period_start=start, period_end=end)
return jsonify({"timesheet_periods": [p.to_dict() for p in periods]})
@api_v1_bp.route("/timesheet-periods", methods=["POST"])
@require_api_token("write:time_entries")
def create_or_get_timesheet_period():
from app.services.workforce_governance_service import WorkforceGovernanceService
data = request.get_json() or {}
ref = _parse_date(data.get("reference_date")) or date.today()
period_type = (data.get("period_type") or "weekly").strip().lower()
user_id = data.get("user_id") if g.api_user.is_admin else g.api_user.id
user_id = int(user_id)
period = WorkforceGovernanceService().get_or_create_period_for_date(user_id=user_id, reference=ref, period_type=period_type)
return jsonify({"timesheet_period": period.to_dict()}), 201
@api_v1_bp.route("/timesheet-periods/<int:period_id>/submit", methods=["POST"])
@require_api_token("write:time_entries")
def submit_timesheet_period(period_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
result = WorkforceGovernanceService().submit_period(period_id=period_id, actor_id=g.api_user.id)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not submit period")}), 400
return jsonify({"message": "Timesheet period submitted", "timesheet_period": result["period"].to_dict()})
@api_v1_bp.route("/timesheet-periods/<int:period_id>/approve", methods=["POST"])
@require_api_token("write:time_entries")
def approve_timesheet_period(period_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
result = WorkforceGovernanceService().approve_period(period_id=period_id, approver_id=g.api_user.id, comment=data.get("comment"))
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not approve period")}), 400
return jsonify({"message": "Timesheet period approved", "timesheet_period": result["period"].to_dict()})
@api_v1_bp.route("/timesheet-periods/<int:period_id>/reject", methods=["POST"])
@require_api_token("write:time_entries")
def reject_timesheet_period(period_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
reason = (data.get("reason") or "").strip()
if not reason:
return jsonify({"error": "reason is required"}), 400
result = WorkforceGovernanceService().reject_period(period_id=period_id, approver_id=g.api_user.id, reason=reason)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not reject period")}), 400
return jsonify({"message": "Timesheet period rejected", "timesheet_period": result["period"].to_dict()})
@api_v1_bp.route("/timesheet-periods/<int:period_id>/close", methods=["POST"])
@require_api_token("write:time_entries")
def close_timesheet_period(period_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
if not g.api_user.is_admin:
return jsonify({"error": "Only admins can close periods"}), 403
data = request.get_json() or {}
result = WorkforceGovernanceService().close_period(period_id=period_id, closer_id=g.api_user.id, reason=data.get("reason"))
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not close period")}), 400
return jsonify({"message": "Timesheet period closed", "timesheet_period": result["period"].to_dict()})
@api_v1_bp.route("/timesheet-policy", methods=["GET"])
@require_api_token("read:time_entries")
def get_timesheet_policy():
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
policy = WorkforceGovernanceService().get_or_create_default_policy()
return jsonify({"timesheet_policy": policy.to_dict()})
@api_v1_bp.route("/timesheet-policy", methods=["PUT", "PATCH"])
@require_api_token("write:time_entries")
def update_timesheet_policy():
from app.services.workforce_governance_service import WorkforceGovernanceService
if not g.api_user.is_admin:
return jsonify({"error": "Access denied"}), 403
service = WorkforceGovernanceService()
policy = service.get_or_create_default_policy()
data = request.get_json() or {}
if "default_period_type" in data:
policy.default_period_type = (data.get("default_period_type") or "weekly").strip().lower()
if "auto_lock_days" in data:
policy.auto_lock_days = data.get("auto_lock_days")
if "approver_user_ids" in data:
ids = data.get("approver_user_ids") or []
if isinstance(ids, list):
policy.approver_user_ids = ",".join(str(int(x)) for x in ids if str(x).strip())
if "enable_multi_level_approval" in data:
policy.enable_multi_level_approval = bool(data.get("enable_multi_level_approval"))
if "require_rejection_comment" in data:
policy.require_rejection_comment = bool(data.get("require_rejection_comment"))
if "enable_admin_override" in data:
policy.enable_admin_override = bool(data.get("enable_admin_override"))
db.session.commit()
return jsonify({"message": "Timesheet policy updated", "timesheet_policy": policy.to_dict()})
# ==================== Time Off ====================
@api_v1_bp.route("/time-off/leave-types", methods=["GET"])
@require_api_token("read:reports")
def list_leave_types_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
enabled_only = request.args.get("enabled_only", "true").lower() == "true"
items = WorkforceGovernanceService().list_leave_types(enabled_only=enabled_only)
return jsonify({"leave_types": [i.to_dict() for i in items]})
@api_v1_bp.route("/time-off/leave-types", methods=["POST"])
@require_api_token("write:reports")
def create_leave_type_api():
from app.models.time_off import LeaveType
if not g.api_user.is_admin:
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
name = (data.get("name") or "").strip()
code = (data.get("code") or "").strip().lower()
if not name or not code:
return jsonify({"error": "name and code are required"}), 400
leave_type = LeaveType(
name=name,
code=code,
is_paid=bool(data.get("is_paid", True)),
annual_allowance_hours=data.get("annual_allowance_hours"),
accrual_hours_per_month=data.get("accrual_hours_per_month"),
enabled=bool(data.get("enabled", True)),
)
db.session.add(leave_type)
db.session.commit()
return jsonify({"message": "Leave type created", "leave_type": leave_type.to_dict()}), 201
@api_v1_bp.route("/time-off/requests", methods=["GET"])
@require_api_token("read:time_entries")
def list_time_off_requests_api():
from app.models.time_off import TimeOffRequest
q = TimeOffRequest.query
if not g.api_user.is_admin:
q = q.filter(TimeOffRequest.user_id == g.api_user.id)
else:
user_id = request.args.get("user_id", type=int)
if user_id:
q = q.filter(TimeOffRequest.user_id == user_id)
status = request.args.get("status")
if status:
q = q.filter(TimeOffRequest.status == status)
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
if start:
q = q.filter(TimeOffRequest.end_date >= start)
if end:
q = q.filter(TimeOffRequest.start_date <= end)
items = q.order_by(TimeOffRequest.start_date.desc()).all()
return jsonify({"time_off_requests": [i.to_dict() for i in items]})
@api_v1_bp.route("/time-off/requests", methods=["POST"])
@require_api_token("write:time_entries")
def create_time_off_request_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
data = request.get_json() or {}
leave_type_id = data.get("leave_type_id")
start = _parse_date(data.get("start_date"))
end = _parse_date(data.get("end_date"))
if not leave_type_id or not start or not end:
return jsonify({"error": "leave_type_id, start_date and end_date are required"}), 400
requested_hours = data.get("requested_hours")
if requested_hours is not None:
try:
from decimal import Decimal
requested_hours = Decimal(str(requested_hours))
except Exception:
return jsonify({"error": "requested_hours must be numeric"}), 400
result = WorkforceGovernanceService().create_leave_request(
user_id=g.api_user.id,
leave_type_id=int(leave_type_id),
start_date=start,
end_date=end,
requested_hours=requested_hours,
comment=data.get("comment"),
submit_now=bool(data.get("submit", True)),
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not create request")}), 400
return jsonify({"message": "Time-off request created", "time_off_request": result["request"].to_dict()}), 201
@api_v1_bp.route("/time-off/requests/<int:request_id>/approve", methods=["POST"])
@require_api_token("write:time_entries")
def approve_time_off_request_api(request_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
result = WorkforceGovernanceService().review_leave_request(
request_id=request_id, reviewer_id=g.api_user.id, approve=True, comment=data.get("comment")
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not approve request")}), 400
return jsonify({"message": "Time-off request approved", "time_off_request": result["request"].to_dict()})
@api_v1_bp.route("/time-off/requests/<int:request_id>/reject", methods=["POST"])
@require_api_token("write:time_entries")
def reject_time_off_request_api(request_id):
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
result = WorkforceGovernanceService().review_leave_request(
request_id=request_id, reviewer_id=g.api_user.id, approve=False, comment=data.get("comment")
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not reject request")}), 400
return jsonify({"message": "Time-off request rejected", "time_off_request": result["request"].to_dict()})
@api_v1_bp.route("/time-off/balances", methods=["GET"])
@require_api_token("read:time_entries")
def time_off_balances_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
user_id = request.args.get("user_id", type=int)
if not user_id or not g.api_user.is_admin:
user_id = g.api_user.id
balances = WorkforceGovernanceService().get_leave_balance(user_id=user_id)
return jsonify({"balances": balances})
@api_v1_bp.route("/time-off/holidays", methods=["GET"])
@require_api_token("read:reports")
def list_holidays_api():
from app.models.time_off import CompanyHoliday
q = CompanyHoliday.query
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
if start:
q = q.filter(CompanyHoliday.end_date >= start)
if end:
q = q.filter(CompanyHoliday.start_date <= end)
items = q.order_by(CompanyHoliday.start_date.asc()).all()
return jsonify({"holidays": [i.to_dict() for i in items]})
@api_v1_bp.route("/time-off/holidays", methods=["POST"])
@require_api_token("write:reports")
def create_holiday_api():
from app.models.time_off import CompanyHoliday
if not g.api_user.is_admin:
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
name = (data.get("name") or "").strip()
start = _parse_date(data.get("start_date"))
end = _parse_date(data.get("end_date"))
if not name or not start or not end:
return jsonify({"error": "name, start_date and end_date are required"}), 400
holiday = CompanyHoliday(name=name, start_date=start, end_date=end, region=data.get("region"), enabled=bool(data.get("enabled", True)))
db.session.add(holiday)
db.session.commit()
return jsonify({"message": "Holiday created", "holiday": holiday.to_dict()}), 201
# ==================== Payroll Export ====================
@api_v1_bp.route("/exports/payroll", methods=["GET"])
@require_api_token("read:reports")
def export_payroll_csv():
from app.services.workforce_governance_service import WorkforceGovernanceService
import csv
import io
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
if not start or not end:
return jsonify({"error": "start_date and end_date are required (YYYY-MM-DD)"}), 400
user_id = request.args.get("user_id", type=int)
if not g.api_user.is_admin:
user_id = g.api_user.id
approved_only = request.args.get("approved_only", "false").lower() == "true"
closed_only = request.args.get("closed_only", "false").lower() == "true"
rows = WorkforceGovernanceService().payroll_rows(
start_date=start,
end_date=end,
user_id=user_id,
approved_only=approved_only,
closed_only=closed_only,
)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"user_id",
"username",
"week_year",
"week_number",
"period_start",
"period_end",
"hours",
"billable_hours",
"non_billable_hours",
]
)
for row in rows:
writer.writerow(
[
row.get("user_id"),
row.get("username"),
row.get("week_year"),
row.get("week_number"),
row.get("period_start"),
row.get("period_end"),
row.get("hours"),
row.get("billable_hours"),
row.get("non_billable_hours"),
]
)
filename = f"payroll_export_{start.isoformat()}_{end.isoformat()}.csv"
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
# ==================== Capacity and Compliance ====================
@api_v1_bp.route("/reports/capacity", methods=["GET"])
@require_api_token("read:reports")
def capacity_report_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
if not start or not end:
return jsonify({"error": "start_date and end_date are required"}), 400
team_user_ids = request.args.get("user_ids")
parsed_user_ids = None
if team_user_ids:
parsed_user_ids = []
for raw in team_user_ids.split(","):
raw = raw.strip()
if raw:
try:
parsed_user_ids.append(int(raw))
except ValueError:
pass
if not g.api_user.is_admin:
parsed_user_ids = [g.api_user.id]
rows = WorkforceGovernanceService().capacity_report(start_date=start, end_date=end, team_user_ids=parsed_user_ids)
return jsonify({"capacity": rows, "start_date": start.isoformat(), "end_date": end.isoformat()})
@api_v1_bp.route("/reports/compliance/locked-periods", methods=["GET"])
@require_api_token("read:reports")
def compliance_locked_periods_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
rows = WorkforceGovernanceService().locked_periods_report(start_date=start, end_date=end)
return jsonify({"locked_periods": rows})
@api_v1_bp.route("/reports/compliance/audit-events", methods=["GET"])
@require_api_token("read:reports")
def compliance_audit_events_api():
from app.services.workforce_governance_service import WorkforceGovernanceService
if not _is_api_approver(g.api_user):
return jsonify({"error": "Access denied"}), 403
start = _parse_date(request.args.get("start_date"))
end = _parse_date(request.args.get("end_date"))
user_id = request.args.get("user_id", type=int)
rows = WorkforceGovernanceService().compliance_audit_events(start_date=start, end_date=end, user_id=user_id)
return jsonify({"audit_events": rows})
# ==================== GPS Mileage Tracking ====================
@api_v1_bp.route("/mileage/gps/start", methods=["POST"])
@require_api_token("write:expenses")
def mileage_gps_start_api():
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().start_tracking(
user_id=g.api_user.id,
latitude=data.get("latitude"),
longitude=data.get("longitude"),
location=data.get("location"),
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not start GPS tracking")}), 400
return jsonify(result), 201
@api_v1_bp.route("/mileage/gps/<int:track_id>/point", methods=["POST"])
@require_api_token("write:expenses")
def mileage_gps_add_point_api(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
latitude = data.get("latitude")
longitude = data.get("longitude")
if latitude is None or longitude is None:
return jsonify({"error": "latitude and longitude are required"}), 400
result = GPSTrackingService().add_track_point(track_id=track_id, latitude=latitude, longitude=longitude)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not add GPS point")}), 400
return jsonify(result)
@api_v1_bp.route("/mileage/gps/<int:track_id>/stop", methods=["POST"])
@require_api_token("write:expenses")
def mileage_gps_stop_api(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().stop_tracking(
track_id=track_id,
latitude=data.get("latitude"),
longitude=data.get("longitude"),
location=data.get("location"),
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not stop GPS tracking")}), 400
return jsonify(result)
@api_v1_bp.route("/mileage/gps/<int:track_id>/expense", methods=["POST"])
@require_api_token("write:expenses")
def mileage_gps_create_expense_api(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().create_expense_from_track(
track_id=track_id,
project_id=data.get("project_id"),
rate_per_km=data.get("rate_per_km"),
)
if not result.get("success"):
return jsonify({"error": result.get("message", "Could not create expense from GPS track")}), 400
return jsonify(result)
@api_v1_bp.route("/mileage/gps", methods=["GET"])
@require_api_token("read:expenses")
def mileage_gps_list_api():
from app.services.gps_tracking_service import GPSTrackingService
start = parse_datetime(request.args.get("start_date")) if request.args.get("start_date") else None
end = parse_datetime(request.args.get("end_date")) if request.args.get("end_date") else None
user_id = request.args.get("user_id", type=int)
if not user_id or not g.api_user.is_admin:
user_id = g.api_user.id
tracks = GPSTrackingService().get_user_tracks(user_id=user_id, start_date=start, end_date=end)
return jsonify({"tracks": tracks})
# ==================== Error Handlers ====================
+40 -8
View File
@@ -15,6 +15,9 @@ from app.utils.module_helpers import module_enabled
custom_reports_bp = Blueprint("custom_reports", __name__)
# Maximum rows returned by report preview/data endpoints to avoid heavy in-memory processing
REPORT_DATA_LIMIT = 2000
@custom_reports_bp.route("/reports/builder")
@custom_reports_bp.route("/reports/builder/<int:view_id>/edit")
@@ -357,8 +360,8 @@ def generate_report_data(config, user_id=None):
# Apply custom field filter if provided
if custom_field_filter:
# Get all entries first, then filter by custom fields
all_entries = query.all()
# Get entries with limit, then filter by custom fields
all_entries = query.order_by(TimeEntry.start_time.desc()).limit(REPORT_DATA_LIMIT).all()
entries = []
for entry in all_entries:
client = None
@@ -377,7 +380,34 @@ def generate_report_data(config, user_id=None):
if matches:
entries.append(entry)
else:
entries = query.all()
entries = query.order_by(TimeEntry.start_time.desc()).limit(REPORT_DATA_LIMIT).all()
# Summary: use SQL aggregate when standard path (no unpaid_only, no custom_field_filter)
if not unpaid_only and not custom_field_filter:
from sqlalchemy import func
summary_query = db.session.query(
func.count(TimeEntry.id).label("total_entries"),
func.coalesce(func.sum(TimeEntry.duration_seconds), 0).label("total_seconds"),
).filter(
TimeEntry.end_time.isnot(None),
TimeEntry.start_time >= start_dt,
TimeEntry.start_time <= end_dt,
)
if user_id:
from app.models import User as U
u = U.query.get(user_id)
if u and not u.is_admin:
summary_query = summary_query.filter(TimeEntry.user_id == user_id)
if project_id:
summary_query = summary_query.filter(TimeEntry.project_id == project_id)
if filters.get("user_id"):
summary_query = summary_query.filter(TimeEntry.user_id == filters["user_id"])
total_count, total_seconds = summary_query.one()
summary_total_entries = total_count
summary_total_hours = round((total_seconds or 0) / 3600, 2)
else:
summary_total_entries = len(entries)
summary_total_hours = round(sum(e.duration_hours or 0 for e in entries), 2)
# Build response data
client_data = {}
@@ -419,11 +449,13 @@ def generate_report_data(config, user_id=None):
return {
"data": data_list,
"summary": {
"total_entries": len(entries),
"total_hours": round(sum(e.duration_hours or 0 for e in entries), 2),
"total_entries": summary_total_entries,
"total_hours": summary_total_hours,
"unpaid_only": unpaid_only,
"by_client": client_data,
},
"truncated": len(entries) >= REPORT_DATA_LIMIT,
"limit": REPORT_DATA_LIMIT,
}
elif data_source == "projects":
@@ -432,7 +464,7 @@ def generate_report_data(config, user_id=None):
if filters.get("status"):
query = query.filter(Project.status == filters["status"])
projects = query.all()
projects = query.limit(REPORT_DATA_LIMIT).all()
return {
"data": [
@@ -477,7 +509,7 @@ def generate_report_data(config, user_id=None):
joinedload(Expense.project),
joinedload(Expense.user),
joinedload(Expense.client),
).order_by(Expense.expense_date.desc()).all()
).order_by(Expense.expense_date.desc()).limit(REPORT_DATA_LIMIT).all()
data_list = [
{
@@ -542,7 +574,7 @@ def generate_report_data(config, user_id=None):
invoices = query.options(
joinedload(Invoice.project),
joinedload(Invoice.client),
).order_by(Invoice.issue_date.desc()).all()
).order_by(Invoice.issue_date.desc()).limit(REPORT_DATA_LIMIT).all()
data_list = [
{
+79
View File
@@ -609,6 +609,85 @@ def mark_reimbursed(mileage_id):
return redirect(url_for("mileage.view_mileage", mileage_id=mileage_id))
@mileage_bp.route("/mileage/gps", methods=["GET"])
@login_required
@module_enabled("mileage")
def gps_tracking_page():
"""GPS mileage tracking helper page."""
projects = Project.query.filter_by(status="active").order_by(Project.name).all()
clients = Client.get_active_clients()
return render_template("mileage/gps.html", projects=projects, clients=clients)
@mileage_bp.route("/api/mileage/gps/start", methods=["POST"])
@login_required
@module_enabled("mileage")
def web_gps_start():
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().start_tracking(
user_id=current_user.id,
latitude=data.get("latitude"),
longitude=data.get("longitude"),
location=data.get("location"),
)
status_code = 201 if result.get("success") else 400
return jsonify(result), status_code
@mileage_bp.route("/api/mileage/gps/<int:track_id>/point", methods=["POST"])
@login_required
@module_enabled("mileage")
def web_gps_add_point(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
if data.get("latitude") is None or data.get("longitude") is None:
return jsonify({"success": False, "message": "latitude and longitude are required"}), 400
result = GPSTrackingService().add_track_point(
track_id=track_id,
latitude=data.get("latitude"),
longitude=data.get("longitude"),
)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@mileage_bp.route("/api/mileage/gps/<int:track_id>/stop", methods=["POST"])
@login_required
@module_enabled("mileage")
def web_gps_stop(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().stop_tracking(
track_id=track_id,
latitude=data.get("latitude"),
longitude=data.get("longitude"),
location=data.get("location"),
)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@mileage_bp.route("/api/mileage/gps/<int:track_id>/expense", methods=["POST"])
@login_required
@module_enabled("mileage")
def web_gps_create_expense(track_id):
from app.services.gps_tracking_service import GPSTrackingService
data = request.get_json() or {}
result = GPSTrackingService().create_expense_from_track(
track_id=track_id,
project_id=data.get("project_id"),
rate_per_km=data.get("rate_per_km"),
)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
# API endpoints
@mileage_bp.route("/api/mileage", methods=["GET"])
@login_required
+96 -4
View File
@@ -960,18 +960,101 @@ def bulk_update_status():
return redirect(url_for("tasks.list_tasks"))
@tasks_bp.route("/tasks/bulk-update-due-date", methods=["POST"])
@login_required
def bulk_update_due_date():
"""Update due date for multiple tasks at once (e.g. from overdue page). Accepts JSON or form."""
if request.is_json:
data = request.get_json(silent=True) or {}
task_ids = [str(x) for x in data.get("task_ids") or []]
due_date_str = (data.get("due_date") or "").strip()
else:
task_ids = request.form.getlist("task_ids[]")
due_date_str = (request.form.get("due_date") or "").strip()
if not task_ids:
if request.is_json:
return jsonify({"success": False, "message": _("No tasks selected")}), 400
flash(_("No tasks selected"), "warning")
return redirect(url_for("tasks.list_tasks"))
if not due_date_str:
if request.is_json:
return jsonify({"success": False, "message": _("Due date is required (YYYY-MM-DD)")}), 400
flash(_("Due date is required"), "error")
return redirect(url_for("tasks.list_tasks"))
try:
from datetime import datetime as dt
due_date = dt.strptime(due_date_str, "%Y-%m-%d").date()
except ValueError:
if request.is_json:
return jsonify({"success": False, "message": _("Invalid date format. Use YYYY-MM-DD")}), 400
flash(_("Invalid date format. Use YYYY-MM-DD"), "error")
return redirect(url_for("tasks.list_tasks"))
updated_count = 0
skipped_count = 0
for task_id_str in task_ids:
try:
task_id = int(task_id_str)
task = Task.query.get(task_id)
if not task:
continue
if not current_user.is_admin and task.created_by != current_user.id:
skipped_count += 1
continue
task.update_due_date(due_date)
updated_count += 1
except Exception:
skipped_count += 1
if updated_count > 0:
if not safe_commit("bulk_update_task_due_date", {"count": updated_count, "due_date": due_date_str}):
if request.is_json:
return jsonify({"success": False, "message": _("Database error")}), 500
flash(_("Could not update tasks due to a database error"), "error")
return redirect(url_for("tasks.list_tasks"))
if request.is_json:
return jsonify({
"success": True,
"updated": updated_count,
"skipped": skipped_count,
"message": _("Updated %(count)s task(s)", count=updated_count) if updated_count else _("No tasks updated"),
})
if updated_count > 0:
flash(
_("Successfully updated %(count)s task(s) due date to %(date)s", count=updated_count, date=due_date_str),
"success",
)
if skipped_count > 0:
flash(_("Skipped %(count)s task(s) (no permission)", count=skipped_count), "warning")
return redirect(url_for("tasks.list_tasks"))
@tasks_bp.route("/tasks/bulk-priority", methods=["POST"])
@login_required
def bulk_update_priority():
"""Update priority for multiple tasks at once"""
task_ids = request.form.getlist("task_ids[]")
new_priority = request.form.get("priority", "").strip()
if request.is_json:
data = request.get_json(silent=True) or {}
task_ids = [str(x) for x in data.get("task_ids") or []]
new_priority = (data.get("priority") or "").strip()
else:
task_ids = request.form.getlist("task_ids[]")
new_priority = (request.form.get("priority") or "").strip()
if not task_ids:
if request.is_json:
return jsonify({"success": False, "message": _("No tasks selected")}), 400
flash(_("No tasks selected"), "warning")
return redirect(url_for("tasks.list_tasks"))
if not new_priority or new_priority not in ["low", "medium", "high", "urgent"]:
if request.is_json:
return jsonify({"success": False, "message": _("Invalid priority value")}), 400
flash(_("Invalid priority value"), "error")
return redirect(url_for("tasks.list_tasks"))
@@ -992,6 +1075,7 @@ def bulk_update_priority():
continue
task.priority = new_priority
task.updated_at = now_in_app_timezone()
updated_count += 1
except Exception:
@@ -999,17 +1083,25 @@ def bulk_update_priority():
if updated_count > 0:
if not safe_commit("bulk_update_task_priority", {"count": updated_count, "priority": new_priority}):
if request.is_json:
return jsonify({"success": False, "message": _("Database error")}), 500
flash(_("Could not update tasks due to a database error"), "error")
return redirect(url_for("tasks.list_tasks"))
if request.is_json:
return jsonify({
"success": True,
"updated": updated_count,
"skipped": skipped_count,
"message": _("Updated %(count)s task(s)", count=updated_count) if updated_count else _("No tasks updated"),
})
if updated_count > 0:
flash(
f'Successfully updated {updated_count} task{"s" if updated_count != 1 else ""} to {new_priority} priority',
"success",
)
if skipped_count > 0:
flash(f'Skipped {skipped_count} task{"s" if skipped_count != 1 else ""} (no permission)', "warning")
return redirect(url_for("tasks.list_tasks"))
+2
View File
@@ -23,6 +23,7 @@ from .peppol_service import PeppolService
from .permission_service import PermissionService
from .backup_service import BackupService
from .health_service import HealthService
from .workforce_governance_service import WorkforceGovernanceService
__all__ = [
"TimeTrackingService",
@@ -45,4 +46,5 @@ __all__ = [
"PermissionService",
"BackupService",
"HealthService",
"WorkforceGovernanceService",
]
+8 -5
View File
@@ -12,6 +12,9 @@ import logging
logger = logging.getLogger(__name__)
# Max rows for report data to avoid unbounded in-memory processing
REPORT_QUERY_LIMIT = 2000
class CustomReportService:
"""Service for building and executing custom reports"""
@@ -56,8 +59,8 @@ class CustomReportService:
if filters.get("project_id"):
query = query.filter(TimeEntry.project_id == filters["project_id"])
# Get data
entries = query.all()
# Get data with limit to avoid loading unbounded rows
entries = query.order_by(TimeEntry.start_time.desc()).limit(REPORT_QUERY_LIMIT).all()
# Apply groupings
grouped_data = self._apply_groupings(entries, groupings)
@@ -79,7 +82,7 @@ class CustomReportService:
if filters.get("client_id"):
query = query.filter(Project.client_id == filters["client_id"])
projects = query.all()
projects = query.order_by(Project.name).limit(REPORT_QUERY_LIMIT).all()
return {"data": [p.to_dict() for p in projects], "summary": {"total_projects": len(projects)}}
@@ -92,7 +95,7 @@ class CustomReportService:
if filters.get("end_date"):
query = query.filter(Invoice.issue_date <= filters["end_date"])
invoices = query.all()
invoices = query.order_by(Invoice.issue_date.desc()).limit(REPORT_QUERY_LIMIT).all()
return {
"data": [i.to_dict() for i in invoices],
@@ -108,7 +111,7 @@ class CustomReportService:
if filters.get("end_date"):
query = query.filter(Expense.expense_date <= filters["end_date"])
expenses = query.all()
expenses = query.order_by(Expense.expense_date.desc()).limit(REPORT_QUERY_LIMIT).all()
return {
"data": [e.to_dict() for e in expenses],
+39
View File
@@ -25,6 +25,15 @@ class TimeTrackingService:
self.time_entry_repo = TimeEntryRepository()
self.project_repo = ProjectRepository()
def _is_locked_period(self, user_id: int, start_time: datetime, end_time: Optional[datetime] = None) -> bool:
from app.services.workforce_governance_service import WorkforceGovernanceService
return WorkforceGovernanceService().is_time_entry_locked(
user_id=user_id,
start_time=start_time,
end_time=end_time,
)
def start_timer(
self,
user_id: int,
@@ -62,6 +71,13 @@ class TimeTrackingService:
dict with 'success', 'message', and 'timer' keys
"""
# Check if user already has an active timer
if self._is_locked_period(user_id, local_now()):
return {
"success": False,
"message": "Timesheet period is closed for this date",
"error": "timesheet_period_locked",
}
active_timer = self.time_entry_repo.get_active_timer(user_id)
if active_timer:
return {
@@ -244,6 +260,13 @@ class TimeTrackingService:
return err
# Validate time range
if self._is_locked_period(user_id, start_time, end_time):
return {
"success": False,
"message": "Timesheet period is closed for the selected date range",
"error": "timesheet_period_locked",
}
if end_time <= start_time:
return {"success": False, "message": "End time must be after start time", "error": "invalid_time_range"}
@@ -380,6 +403,14 @@ class TimeTrackingService:
if not is_admin and entry.user_id != user_id:
return {"success": False, "message": "Access denied", "error": "access_denied"}
# Block non-admin edits in closed periods
if (not is_admin) and self._is_locked_period(entry.user_id, entry.start_time, entry.end_time or entry.start_time):
return {
"success": False,
"message": "Timesheet period is closed for this entry",
"error": "timesheet_period_locked",
}
# Don't allow updating active entries to have end_time
if entry.is_active and end_time is not None:
return {
@@ -547,6 +578,14 @@ class TimeTrackingService:
if not is_admin and entry.user_id != user_id:
return {"success": False, "message": "Access denied", "error": "access_denied"}
# Block non-admin deletes in closed periods
if (not is_admin) and self._is_locked_period(entry.user_id, entry.start_time, entry.end_time or entry.start_time):
return {
"success": False,
"message": "Timesheet period is closed for this entry",
"error": "timesheet_period_locked",
}
# Don't allow deletion of active entries
if entry.is_active:
return {