Files
TimeTracker/app/services/time_tracking_service.py
T
Dries Peeters 6c8e86cd01 fix(timer): respect Settings.single_active_timer at runtime
Timer starts always blocked a second running entry and never read the\nadmin-controlled Settings flag.\n\n- Add TimeTrackingService.can_start_timer() using Settings.get_settings()\n  and wire it into start_timer, web timer routes, kiosk start, and\n  legacy POST /api/timer/resume.\n- POST /api/v1/timer/start returns 409 with error_code\n  timer_already_running when single-active mode is on and a timer\n  is already running.\n- Deduplicate start_timer template handling in the service.\n\nTests: tests/test_single_active_timer_setting.py.\nDocs: REST_API (responses), GETTING_STARTED, REQUIREMENTS, Docker env\nnotes, TESTING_STRATEGY, env.example comment; CHANGELOG entry.
2026-04-27 19:16:25 +02:00

715 lines
28 KiB
Python

"""
Service for time tracking business logic.
"""
from datetime import datetime
from typing import Any, Dict, List, Optional
from flask_login import current_user
from app import db
from app.constants import TimeEntrySource, TimeEntryStatus, WebhookEvent
from app.models import Project, Settings, Task, TimeEntry
from app.models.time_entry import local_now
from app.repositories import ProjectRepository, TimeEntryRepository
from app.utils.db import safe_commit
from app.utils.event_bus import emit_event
from app.utils.time_entry_validation import validate_time_entry_requirements
from app.utils.timezone import parse_local_datetime
class TimeTrackingService:
"""Service for time tracking operations"""
def __init__(self):
self.time_entry_repo = TimeEntryRepository()
self.project_repo = ProjectRepository()
def _is_locked_period(self, user_id: int, start_time: datetime, end_time: Optional[datetime] = None) -> bool:
from app.services.workforce_governance_service import WorkforceGovernanceService
return WorkforceGovernanceService().is_time_entry_locked(
user_id=user_id,
start_time=start_time,
end_time=end_time,
)
def can_start_timer(self, user_id: int) -> tuple[bool, Optional[str]]:
"""Return (True, None) if the user may start a new timer, else (False, message).
Reads ``Settings.get_settings()`` at call time (DB), not ``Config.SINGLE_ACTIVE_TIMER``
alone—env seeds new installs; admin UI updates the row users expect at runtime.
"""
settings = Settings.get_settings()
if not settings.single_active_timer:
return True, None
if self.time_entry_repo.get_active_timer(user_id):
return False, "You already have an active timer. Stop it before starting a new one."
return True, None
def start_timer(
self,
user_id: int,
project_id: int,
task_id: Optional[int] = None,
notes: Optional[str] = None,
template_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
Start a new timer for a user.
Returns:
dict with 'success', 'message', and 'timer' keys
"""
if self._is_locked_period(user_id, local_now()):
return {
"success": False,
"message": "Timesheet period is closed for this date",
"error": "timesheet_period_locked",
}
ok, conflict_msg = self.can_start_timer(user_id)
if not ok:
return {
"success": False,
"message": conflict_msg or "You already have an active timer. Stop it before starting a new one.",
"error": "timer_already_running",
}
# Resolve template defaults before project validation
if template_id:
from app.models import TimeEntryTemplate
template = TimeEntryTemplate.query.filter_by(id=template_id, user_id=user_id).first()
if template:
if not project_id and template.project_id:
project_id = template.project_id
if not task_id and template.task_id:
task_id = template.task_id
if not notes and template.default_notes:
notes = template.default_notes
template.record_usage()
# Validate project
project = self.project_repo.get_by_id(project_id)
if not project:
return {"success": False, "message": "Invalid project selected", "error": "invalid_project"}
# Check project status
if project.status == "archived":
return {
"success": False,
"message": "Cannot start timer for an archived project. Please unarchive the project first.",
"error": "project_archived",
}
if project.status != "active":
return {
"success": False,
"message": "Cannot start timer for an inactive project",
"error": "project_inactive",
}
# Validate task if provided
if task_id:
task = Task.query.filter_by(id=task_id, project_id=project_id).first()
if not task:
return {
"success": False,
"message": "Selected task is invalid for the chosen project",
"error": "invalid_task",
}
# Validate time entry requirements (task, description)
settings = Settings.get_settings()
err = validate_time_entry_requirements(
settings, project_id=project_id, client_id=None, task_id=task_id, notes=notes
)
if err:
return err
# Create timer
timer = self.time_entry_repo.create_timer(
user_id=user_id, project_id=project_id, task_id=task_id, notes=notes, source=TimeEntrySource.AUTO.value
)
if not safe_commit("start_timer", {"user_id": user_id, "project_id": project_id}):
return {
"success": False,
"message": "Could not start timer due to a database error",
"error": "database_error",
}
# Emit domain event
emit_event(
WebhookEvent.TIME_ENTRY_CREATED.value, {"entry_id": timer.id, "user_id": user_id, "project_id": project_id}
)
return {"success": True, "message": "Timer started successfully", "timer": timer}
def stop_timer(self, user_id: int, entry_id: Optional[int] = None) -> Dict[str, Any]:
"""
Stop the active timer for a user.
Returns:
dict with 'success', 'message', and 'entry' keys
"""
if entry_id:
entry = self.time_entry_repo.get_by_id(entry_id)
else:
entry = self.time_entry_repo.get_active_timer(user_id)
if not entry:
return {"success": False, "message": "No active timer found", "error": "no_active_timer"}
if entry.user_id != user_id:
return {"success": False, "message": "You can only stop your own timer", "error": "unauthorized"}
if entry.end_time is not None:
return {"success": False, "message": "Timer is already stopped", "error": "timer_already_stopped"}
# Stop the timer
entry.end_time = local_now()
entry.calculate_duration()
if not safe_commit("stop_timer", {"user_id": user_id, "entry_id": entry.id}):
return {
"success": False,
"message": "Could not stop timer due to a database error",
"error": "database_error",
}
return {"success": True, "message": "Timer stopped successfully", "entry": entry}
def pause_timer(self, user_id: int) -> Dict[str, Any]:
"""Pause the active timer for a user. Clock stops; break accumulates on resume."""
entry = self.time_entry_repo.get_active_timer(user_id)
if not entry:
return {"success": False, "message": "No active timer found", "error": "no_active_timer"}
if entry.user_id != user_id:
return {"success": False, "message": "You can only pause your own timer", "error": "unauthorized"}
try:
entry.pause_timer()
except ValueError as e:
return {"success": False, "message": str(e), "error": "invalid_state"}
if not safe_commit("pause_timer", {"user_id": user_id, "entry_id": entry.id}):
return {"success": False, "message": "Could not pause timer", "error": "database_error"}
return {"success": True, "message": "Timer paused", "entry": entry}
def resume_timer(self, user_id: int) -> Dict[str, Any]:
"""Resume a paused timer; time since pause is added to break_seconds."""
entry = self.time_entry_repo.get_active_timer(user_id)
if not entry:
return {"success": False, "message": "No active timer found", "error": "no_active_timer"}
if entry.user_id != user_id:
return {"success": False, "message": "You can only resume your own timer", "error": "unauthorized"}
try:
entry.resume_timer()
except ValueError as e:
return {"success": False, "message": str(e), "error": "invalid_state"}
if not safe_commit("resume_timer", {"user_id": user_id, "entry_id": entry.id}):
return {"success": False, "message": "Could not resume timer", "error": "database_error"}
return {"success": True, "message": "Timer resumed", "entry": entry}
def create_manual_entry(
self,
user_id: int,
project_id: Optional[int] = None,
client_id: Optional[int] = None,
start_time: datetime = None,
end_time: datetime = None,
duration_seconds: Optional[int] = None,
break_seconds: Optional[int] = None,
task_id: Optional[int] = None,
notes: Optional[str] = None,
tags: Optional[str] = None,
billable: bool = True,
paid: bool = False,
invoice_number: Optional[str] = None,
skip_entry_requirements: bool = False,
) -> Dict[str, Any]:
"""
Create a manual time entry.
Returns:
dict with 'success', 'message', and 'entry' keys
"""
# Validate that either project_id or client_id is provided
if not project_id and not client_id:
return {
"success": False,
"message": "Either project or client must be selected",
"error": "missing_project_or_client",
}
# Validate project if provided
if project_id:
project = self.project_repo.get_by_id(project_id)
if not project:
return {"success": False, "message": "Invalid project", "error": "invalid_project"}
# Validate task if provided (only valid when project_id is set)
if task_id:
task = Task.query.filter_by(id=task_id, project_id=project_id).first()
if not task:
return {"success": False, "message": "Invalid task for selected project", "error": "invalid_task"}
# Validate client if provided
if client_id:
from app.repositories import ClientRepository
client_repo = ClientRepository()
client = client_repo.get_by_id(client_id)
if not client:
return {"success": False, "message": "Invalid client", "error": "invalid_client"}
# Task cannot be set when billing directly to client
if task_id:
return {
"success": False,
"message": "Tasks can only be assigned to project-based time entries",
"error": "task_not_allowed",
}
# Validate time entry requirements (task, description) - skip for imports
if not skip_entry_requirements:
settings = Settings.get_settings()
err = validate_time_entry_requirements(
settings, project_id=project_id, client_id=client_id, task_id=task_id, notes=notes
)
if err:
return err
# Validate time range
if self._is_locked_period(user_id, start_time, end_time):
return {
"success": False,
"message": "Timesheet period is closed for the selected date range",
"error": "timesheet_period_locked",
}
if end_time <= start_time:
return {"success": False, "message": "End time must be after start time", "error": "invalid_time_range"}
# Check for overlapping entries (unless skipped for imports)
if not skip_entry_requirements:
overlapping = TimeEntry.query.filter(
TimeEntry.user_id == user_id,
TimeEntry.start_time < end_time,
TimeEntry.end_time > start_time,
TimeEntry.end_time.isnot(None),
).first()
if overlapping:
return {
"success": False,
"message": "This time overlaps with an existing entry. Please choose a different time range or edit the existing entry.",
"error": "overlapping_entry",
}
if duration_seconds is not None:
try:
duration_seconds = int(duration_seconds)
except Exception:
return {"success": False, "message": "Invalid duration", "error": "invalid_duration"}
if duration_seconds <= 0:
return {"success": False, "message": "Duration must be positive", "error": "invalid_duration"}
# Create entry (duration_seconds is net; break_seconds is stored and subtracted when computing from start/end)
if break_seconds is not None:
break_seconds = max(0, int(break_seconds))
entry = self.time_entry_repo.create_manual_entry(
user_id=user_id,
project_id=project_id,
client_id=client_id,
start_time=start_time,
end_time=end_time,
duration_seconds=duration_seconds,
break_seconds=break_seconds,
task_id=task_id,
notes=notes,
tags=tags,
billable=billable,
paid=paid,
invoice_number=invoice_number,
)
commit_data = {"user_id": user_id}
if project_id:
commit_data["project_id"] = project_id
if client_id:
commit_data["client_id"] = client_id
if not safe_commit("create_manual_entry", commit_data):
return {
"success": False,
"message": "Could not create time entry due to a database error",
"error": "database_error",
}
from app.telemetry.otel_setup import business_span
with business_span(
"timer.persist",
user_id=user_id,
project_based=bool(project_id),
client_only=bool(client_id and not project_id),
has_task=bool(task_id),
):
pass
return {"success": True, "message": "Time entry created successfully", "entry": entry}
def get_user_entries(
self,
user_id: int,
limit: Optional[int] = None,
offset: int = 0,
project_id: Optional[int] = None,
client_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[TimeEntry]:
"""Get time entries for a user with optional filters"""
if start_date and end_date:
return self.time_entry_repo.get_by_date_range(
start_date=start_date,
end_date=end_date,
user_id=user_id,
project_id=project_id,
client_id=client_id,
include_relations=True,
)
elif project_id:
return self.time_entry_repo.get_by_project(
project_id=project_id, limit=limit, offset=offset, include_relations=True
)
else:
return self.time_entry_repo.get_by_user(user_id=user_id, limit=limit, offset=offset, include_relations=True)
def get_active_timer(self, user_id: int) -> Optional[TimeEntry]:
"""Get the active timer for a user"""
return self.time_entry_repo.get_active_timer(user_id)
def update_entry(
self,
entry_id: int,
user_id: int,
is_admin: bool = False,
project_id: Optional[int] = None,
client_id: Optional[int] = None,
task_id: Optional[int] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
break_seconds: Optional[int] = None,
notes: Optional[str] = None,
tags: Optional[str] = None,
billable: Optional[bool] = None,
paid: Optional[bool] = None,
invoice_number: Optional[str] = None,
reason: Optional[str] = None,
expected_updated_at: Optional[datetime] = None,
) -> Dict[str, Any]:
"""
Update a time entry.
Args:
entry_id: ID of the time entry to update
user_id: ID of the user performing the update
is_admin: Whether the user is an admin
project_id: Optional new project ID
client_id: Optional new client ID
task_id: Optional new task ID
start_time: Optional new start time
end_time: Optional new end time
notes: Optional new notes
tags: Optional new tags
billable: Optional new billable status
paid: Optional new paid status
invoice_number: Optional new invoice number
reason: Optional reason for the change
Returns:
dict with 'success', 'message', and 'entry' keys
"""
entry = self.time_entry_repo.get_by_id(entry_id)
if not entry:
return {"success": False, "message": "Time entry not found", "error": "not_found"}
# Check permissions
if not is_admin and entry.user_id != user_id:
return {"success": False, "message": "Access denied", "error": "access_denied"}
# Optimistic concurrency (optional): mobile / API clients send last known updated_at
if expected_updated_at is not None and entry.updated_at is not None:
if abs((entry.updated_at - expected_updated_at).total_seconds()) > 2:
return {
"success": False,
"message": "Time entry was modified on the server. Refresh and try again.",
"error": "conflict",
"entry": entry,
}
# Block non-admin edits in closed periods
if (not is_admin) and self._is_locked_period(
entry.user_id, entry.start_time, entry.end_time or entry.start_time
):
return {
"success": False,
"message": "Timesheet period is closed for this entry",
"error": "timesheet_period_locked",
}
# Don't allow updating active entries to have end_time
if entry.is_active and end_time is not None:
return {
"success": False,
"message": "Cannot set end_time on active timer. Stop the timer first.",
"error": "timer_active",
}
# Capture old state before changes
from app.utils.audit import capture_timeentry_metadata, capture_timeentry_state
full_old_state = capture_timeentry_state(entry)
entity_metadata = capture_timeentry_metadata(entry)
# Update fields
if project_id is not None:
# Validate project
project = self.project_repo.get_by_id(project_id)
if not project:
return {"success": False, "message": "Invalid project", "error": "invalid_project"}
entry.project_id = project_id
# Clear client_id when setting project_id
entry.client_id = None
# Handle client_id update
if client_id is not None:
from app.repositories import ClientRepository
client_repo = ClientRepository()
client = client_repo.get_by_id(client_id)
if not client:
return {"success": False, "message": "Invalid client", "error": "invalid_client"}
entry.client_id = client_id
# Clear project_id and task_id when setting client_id
entry.project_id = None
entry.task_id = None
if task_id is not None:
# Task can only be set when project_id is set
if not entry.project_id:
return {
"success": False,
"message": "Task can only be assigned to project-based time entries",
"error": "task_requires_project",
}
entry.task_id = task_id
if start_time is not None:
entry.start_time = start_time
if end_time is not None:
entry.end_time = end_time
if break_seconds is not None:
entry.break_seconds = max(0, int(break_seconds))
# Recompute stored duration when start, end, or break changed
if entry.end_time and (start_time is not None or end_time is not None or break_seconds is not None):
entry.calculate_duration()
if notes is not None:
entry.notes = notes
if tags is not None:
entry.tags = tags
if billable is not None:
entry.billable = billable
if paid is not None:
entry.paid = paid
# Clear invoice number if marking as unpaid
if not entry.paid:
entry.invoice_number = None
if invoice_number is not None:
entry.invoice_number = invoice_number.strip() if invoice_number else None
# Validate time entry requirements on updated state (entry reflects changes applied above)
settings = Settings.get_settings()
err = validate_time_entry_requirements(
settings,
project_id=entry.project_id,
client_id=entry.client_id,
task_id=entry.task_id,
notes=entry.notes,
)
if err:
# Rollback uncommitted changes
db.session.rollback()
return err
# Check for overlapping entries (exclude this entry) when times were changed
if entry.end_time and (start_time is not None or end_time is not None):
overlapping = TimeEntry.query.filter(
TimeEntry.user_id == entry.user_id,
TimeEntry.id != entry_id,
TimeEntry.start_time < entry.end_time,
TimeEntry.end_time > entry.start_time,
TimeEntry.end_time.isnot(None),
).first()
if overlapping:
db.session.rollback()
return {
"success": False,
"message": "This time overlaps with an existing entry. Please choose a different time range.",
"error": "overlapping_entry",
}
entry.updated_at = local_now()
if not safe_commit("update_entry", {"user_id": user_id, "entry_id": entry_id}):
return {
"success": False,
"message": "Could not update time entry due to a database error",
"error": "database_error",
}
# Capture new state after changes and create comprehensive audit log
try:
# Refresh entry to get updated values
db.session.refresh(entry)
full_new_state = capture_timeentry_state(entry)
updated_metadata = capture_timeentry_metadata(entry)
from app.models.audit_log import AuditLog
from app.utils.audit import get_request_info
ip_address, user_agent, request_path = get_request_info()
entity_name = entry.project.name if entry.project else (entry.client.name if entry.client else "Unknown")
AuditLog.log_change(
user_id=user_id,
action="updated",
entity_type="TimeEntry",
entity_id=entry_id,
entity_name=entity_name,
change_description=f"Updated time entry for {entity_name}",
reason=reason,
entity_metadata=updated_metadata,
full_old_state=full_old_state,
full_new_state=full_new_state,
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
)
db.session.commit()
except Exception as e:
# Don't fail update if audit logging fails
import logging
logging.getLogger(__name__).warning(f"Failed to create audit log for TimeEntry update: {e}")
return {"success": True, "message": "Time entry updated successfully", "entry": entry}
def delete_entry(
self, user_id: int, entry_id: int, is_admin: bool = False, reason: Optional[str] = None
) -> Dict[str, Any]:
"""
Delete a time entry.
Args:
user_id: ID of the user performing the deletion
entry_id: ID of the time entry to delete
is_admin: Whether the user is an admin
reason: Optional reason for deletion
Returns:
dict with 'success' and 'message' keys
"""
entry = self.time_entry_repo.get_by_id(entry_id)
if not entry:
return {"success": False, "message": "Time entry not found", "error": "not_found"}
# Check permissions
if not is_admin and entry.user_id != user_id:
return {"success": False, "message": "Access denied", "error": "access_denied"}
# Block non-admin deletes in closed periods
if (not is_admin) and self._is_locked_period(
entry.user_id, entry.start_time, entry.end_time or entry.start_time
):
return {
"success": False,
"message": "Timesheet period is closed for this entry",
"error": "timesheet_period_locked",
}
# Don't allow deletion of active entries
if entry.is_active:
return {
"success": False,
"message": "Cannot delete active time entry. Stop the timer first.",
"error": "timer_active",
}
# Capture entry info for logging before deletion
project_name = entry.project.name if entry.project else None
client_name = entry.client.name if entry.client else None
entity_name = project_name or client_name or "Unknown"
duration_formatted = entry.duration_formatted
# Capture full state and metadata for audit logging
from app.models.audit_log import AuditLog
from app.utils.audit import capture_timeentry_metadata, capture_timeentry_state, get_request_info
full_old_state = capture_timeentry_state(entry)
entity_metadata = capture_timeentry_metadata(entry)
ip_address, user_agent, request_path = get_request_info()
if self.time_entry_repo.delete(entry):
if safe_commit("delete_entry", {"user_id": user_id, "entry_id": entry_id}):
# Create comprehensive audit log entry
try:
AuditLog.log_change(
user_id=user_id,
action="deleted",
entity_type="TimeEntry",
entity_id=entry_id,
entity_name=entity_name,
change_description=f"Deleted time entry for {entity_name} - {duration_formatted}",
reason=reason,
entity_metadata=entity_metadata,
full_old_state=full_old_state,
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
)
db.session.commit()
except Exception as e:
# Don't fail deletion if audit logging fails
import logging
logging.getLogger(__name__).warning(f"Failed to create audit log for TimeEntry deletion: {e}")
# Log activity
from flask import request
from app.models import Activity
Activity.log(
user_id=user_id,
action="deleted",
entity_type="time_entry",
entity_id=entry_id,
entity_name=entity_name,
description=f"Deleted time entry for {entity_name} - {duration_formatted}",
extra_data={
"project_name": project_name,
"client_name": client_name,
"duration_formatted": duration_formatted,
"reason": reason,
},
ip_address=ip_address,
user_agent=user_agent,
)
return {"success": True, "message": "Time entry deleted successfully"}
return {"success": False, "message": "Could not delete time entry", "error": "database_error"}