feat(api): implement and enhance search API endpoints

- Fix syntax error in existing /api/search endpoint (missing parenthesis in tasks query)
- Enhance /api/search endpoint with types filter, improved error handling, and response metadata
- Add new /api/v1/search endpoint with token-based authentication
  - Requires read:projects scope
  - Respects user permissions (non-admin users see only their own time entries)
  - Supports filtering by entity type (project, task, client, entry)
  - Includes OpenAPI documentation
- Add comprehensive test suite for both endpoints
  - Tests for legacy /api/search (session-based auth)
  - Tests for /api/v1/search (token-based auth)
  - Covers authentication, authorization, filtering, and search functionality
- Update API documentation in docs/api/REST_API.md
  - Add search endpoint documentation with examples
  - Include parameter descriptions and response formats
- Add search endpoint to /api/v1/info endpoint listing

This addresses the HIGH PRIORITY requirement to implement the search API
endpoint that was referenced but may not have been fully functional.

Resolves: Search API endpoint (/api/search) referenced but may not exist
This commit is contained in:
Dries Peeters
2025-12-29 12:40:38 +01:00
parent 95a35d2cd0
commit e2d7e21447
4 changed files with 650 additions and 109 deletions

View File

@@ -61,140 +61,160 @@ def timer_status():
@api_bp.route("/api/search")
@login_required
def search():
"""Global search endpoint for projects, tasks, clients, and time entries"""
"""Global search endpoint for projects, tasks, clients, and time entries
Query Parameters:
q (str): Search query (minimum 2 characters)
limit (int): Maximum number of results per category (default: 10, max: 50)
types (str): Comma-separated list of types to search (project, task, client, entry)
Returns:
JSON object with search results array
"""
query = request.args.get("q", "").strip()
limit = request.args.get("limit", 10, type=int)
limit = min(request.args.get("limit", 10, type=int), 50) # Cap at 50
types_filter = request.args.get("types", "").strip().lower()
if not query or len(query) < 2:
return jsonify({"results": []})
return jsonify({"results": [], "query": query})
# Parse types filter
allowed_types = {"project", "task", "client", "entry"}
if types_filter:
requested_types = {t.strip() for t in types_filter.split(",") if t.strip()}
search_types = requested_types.intersection(allowed_types)
else:
search_types = allowed_types
results = []
search_pattern = f"%{query}%"
# Search projects
try:
projects = (
Project.query.filter(
Project.status == "active",
or_(Project.name.ilike(search_pattern), Project.description.ilike(search_pattern)),
if "project" in search_types:
try:
projects = (
Project.query.filter(
Project.status == "active",
or_(Project.name.ilike(search_pattern), Project.description.ilike(search_pattern)),
)
.limit(limit)
.all()
)
.limit(limit)
.all()
)
for project in projects:
results.append(
{
"type": "project",
"category": "project",
"id": project.id,
"title": project.name,
"description": project.description or "",
"url": f"/projects/{project.id}",
"badge": "Project",
}
)
except Exception as e:
current_app.logger.error(f"Error searching projects: {e}")
for project in projects:
results.append(
{
"type": "project",
"category": "project",
"id": project.id,
"title": project.name,
"description": project.description or "",
"url": f"/projects/{project.id}",
"badge": "Project",
}
)
except Exception as e:
current_app.logger.error(f"Error searching projects: {e}")
# Search tasks
try:
tasks = (
Task.query.join(Project)
.filter(
Project.status == "active", or_(Task.name.ilike(search_pattern), Task.description.ilike(search_pattern))
if "task" in search_types:
try:
tasks = (
Task.query.join(Project)
.filter(
Project.status == "active",
or_(Task.name.ilike(search_pattern), Task.description.ilike(search_pattern))
)
.limit(limit)
.all()
)
.limit(limit)
.all()
)
for task in tasks:
results.append(
{
"type": "task",
"category": "task",
"id": task.id,
"title": task.name,
"description": f"{task.project.name if task.project else 'No Project'}",
"url": f"/tasks/{task.id}",
"badge": task.status.replace("_", " ").title() if task.status else "Task",
}
)
except Exception as e:
current_app.logger.error(f"Error searching tasks: {e}")
for task in tasks:
results.append(
{
"type": "task",
"category": "task",
"id": task.id,
"title": task.name,
"description": f"{task.project.name if task.project else 'No Project'}",
"url": f"/tasks/{task.id}",
"badge": task.status.replace("_", " ").title() if task.status else "Task",
}
)
except Exception as e:
current_app.logger.error(f"Error searching tasks: {e}")
# Search clients
try:
clients = (
Client.query.filter(
or_(
Client.name.ilike(search_pattern),
Client.email.ilike(search_pattern),
Client.company.ilike(search_pattern),
if "client" in search_types:
try:
clients = (
Client.query.filter(
or_(
Client.name.ilike(search_pattern),
Client.email.ilike(search_pattern),
Client.company.ilike(search_pattern),
)
)
.limit(limit)
.all()
)
.limit(limit)
.all()
)
for client in clients:
results.append(
{
"type": "client",
"category": "client",
"id": client.id,
"title": client.name,
"description": client.company or client.email or "",
"url": f"/clients/{client.id}",
"badge": "Client",
}
)
except Exception as e:
current_app.logger.error(f"Error searching clients: {e}")
for client in clients:
results.append(
{
"type": "client",
"category": "client",
"id": client.id,
"title": client.name,
"description": client.company or client.email or "",
"url": f"/clients/{client.id}",
"badge": "Client",
}
)
except Exception as e:
current_app.logger.error(f"Error searching clients: {e}")
# Search time entries (notes and tags)
try:
entries = (
TimeEntry.query.filter(
TimeEntry.user_id == current_user.id,
TimeEntry.end_time.isnot(None),
or_(TimeEntry.notes.ilike(search_pattern), TimeEntry.tags.ilike(search_pattern)),
if "entry" in search_types:
try:
entries = (
TimeEntry.query.filter(
TimeEntry.user_id == current_user.id,
TimeEntry.end_time.isnot(None),
or_(TimeEntry.notes.ilike(search_pattern), TimeEntry.tags.ilike(search_pattern)),
)
.order_by(TimeEntry.start_time.desc())
.limit(limit)
.all()
)
.order_by(TimeEntry.start_time.desc())
.limit(limit)
.all()
)
for entry in entries:
title_parts = []
if entry.project:
title_parts.append(entry.project.name)
if entry.task:
title_parts.append(f"{entry.task.name}")
title = " ".join(title_parts) if title_parts else "Time Entry"
for entry in entries:
title_parts = []
if entry.project:
title_parts.append(entry.project.name)
if entry.task:
title_parts.append(f"{entry.task.name}")
title = " ".join(title_parts) if title_parts else "Time Entry"
description = entry.notes[:100] if entry.notes else ""
if entry.tags:
description += f" [{entry.tags}]"
description = entry.notes[:100] if entry.notes else ""
if entry.tags:
description += f" [{entry.tags}]"
results.append(
{
"type": "entry",
"category": "entry",
"id": entry.id,
"title": title,
"description": description,
"url": f"/timer/edit/{entry.id}",
"badge": entry.duration_formatted,
}
)
except Exception as e:
current_app.logger.error(f"Error searching time entries: {e}")
results.append(
{
"type": "entry",
"category": "entry",
"id": entry.id,
"title": title,
"description": description,
"url": f"/timer/edit/{entry.id}",
"badge": entry.duration_formatted,
}
)
except Exception as e:
current_app.logger.error(f"Error searching time entries: {e}")
# Limit total results
results = results[:limit]
return jsonify({"results": results})
return jsonify({"results": results, "query": query, "count": len(results)})
@api_bp.route("/api/deadlines/upcoming")

View File

@@ -178,6 +178,7 @@ def api_info():
"webhooks": "/api/v1/webhooks",
"users": "/api/v1/users",
"reports": "/api/v1/reports",
"search": "/api/v1/search",
},
}
)
@@ -5353,6 +5354,221 @@ def receive_purchase_order_api(po_id):
return jsonify({"error": str(e)}), 400
# ==================== Search ====================
@api_v1_bp.route("/search", methods=["GET"])
@require_api_token("read:projects")
def search():
"""Global search endpoint across projects, tasks, clients, and time entries
---
tags:
- Search
parameters:
- name: q
in: query
type: string
required: true
description: Search query (minimum 2 characters)
- name: limit
in: query
type: integer
default: 10
description: Maximum number of results per category (max 50)
- name: types
in: query
type: string
description: Comma-separated list of types to search (project, task, client, entry)
security:
- Bearer: []
responses:
200:
description: Search results
schema:
type: object
properties:
results:
type: array
items:
type: object
properties:
type:
type: string
enum: [project, task, client, entry]
category:
type: string
id:
type: integer
title:
type: string
description:
type: string
url:
type: string
badge:
type: string
query:
type: string
count:
type: integer
400:
description: Invalid query (too short)
"""
query = request.args.get("q", "").strip()
limit = min(request.args.get("limit", 10, type=int), 50) # Cap at 50
types_filter = request.args.get("types", "").strip().lower()
if not query or len(query) < 2:
return jsonify({"error": "Query must be at least 2 characters", "results": []}), 400
# Parse types filter
allowed_types = {"project", "task", "client", "entry"}
if types_filter:
requested_types = {t.strip() for t in types_filter.split(",") if t.strip()}
search_types = requested_types.intersection(allowed_types)
else:
search_types = allowed_types
results = []
search_pattern = f"%{query}%"
# Get authenticated user from API token
user = g.api_user
# Search projects
if "project" in search_types:
try:
projects = (
Project.query.filter(
Project.status == "active",
or_(Project.name.ilike(search_pattern), Project.description.ilike(search_pattern)),
)
.limit(limit)
.all()
)
for project in projects:
results.append(
{
"type": "project",
"category": "project",
"id": project.id,
"title": project.name,
"description": project.description or "",
"url": f"/projects/{project.id}",
"badge": "Project",
}
)
except Exception as e:
current_app.logger.error(f"Error searching projects: {e}")
# Search tasks
if "task" in search_types:
try:
tasks = (
Task.query.join(Project)
.filter(
Project.status == "active",
or_(Task.name.ilike(search_pattern), Task.description.ilike(search_pattern))
)
.limit(limit)
.all()
)
for task in tasks:
results.append(
{
"type": "task",
"category": "task",
"id": task.id,
"title": task.name,
"description": f"{task.project.name if task.project else 'No Project'}",
"url": f"/tasks/{task.id}",
"badge": task.status.replace("_", " ").title() if task.status else "Task",
}
)
except Exception as e:
current_app.logger.error(f"Error searching tasks: {e}")
# Search clients
if "client" in search_types:
try:
clients = (
Client.query.filter(
or_(
Client.name.ilike(search_pattern),
Client.email.ilike(search_pattern),
Client.company.ilike(search_pattern),
)
)
.limit(limit)
.all()
)
for client in clients:
results.append(
{
"type": "client",
"category": "client",
"id": client.id,
"title": client.name,
"description": client.company or client.email or "",
"url": f"/clients/{client.id}",
"badge": "Client",
}
)
except Exception as e:
current_app.logger.error(f"Error searching clients: {e}")
# Search time entries (notes and tags)
# Non-admin users can only see their own entries
if "entry" in search_types:
try:
entries_query = TimeEntry.query.filter(
TimeEntry.end_time.isnot(None),
or_(TimeEntry.notes.ilike(search_pattern), TimeEntry.tags.ilike(search_pattern)),
)
# Restrict to user's entries if not admin
if not user.is_admin:
entries_query = entries_query.filter(TimeEntry.user_id == user.id)
entries = (
entries_query
.order_by(TimeEntry.start_time.desc())
.limit(limit)
.all()
)
for entry in entries:
title_parts = []
if entry.project:
title_parts.append(entry.project.name)
if entry.task:
title_parts.append(f"{entry.task.name}")
title = " ".join(title_parts) if title_parts else "Time Entry"
description = entry.notes[:100] if entry.notes else ""
if entry.tags:
description += f" [{entry.tags}]"
results.append(
{
"type": "entry",
"category": "entry",
"id": entry.id,
"title": title,
"description": description,
"url": f"/timer/edit/{entry.id}",
"badge": entry.duration_formatted,
}
)
except Exception as e:
current_app.logger.error(f"Error searching time entries: {e}")
return jsonify({"results": results, "query": query, "count": len(results)})
# ==================== Error Handlers ====================

View File

@@ -171,6 +171,75 @@ GET /api/v1/health
Check if the API is operational. No authentication required.
### Search
#### Global Search
```
GET /api/v1/search
```
Perform a global search across projects, tasks, clients, and time entries.
**Required Scope:** `read:projects`
**Query Parameters:**
- `q` (required) - Search query (minimum 2 characters)
- `limit` (optional) - Maximum number of results per category (default: 10, max: 50)
- `types` (optional) - Comma-separated list of types to search: `project`, `task`, `client`, `entry`
**Example:**
```bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
"https://your-domain.com/api/v1/search?q=website&limit=10"
```
**Search by specific types:**
```bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
"https://your-domain.com/api/v1/search?q=website&types=project,task"
```
**Response:**
```json
{
"results": [
{
"type": "project",
"category": "project",
"id": 1,
"title": "Website Redesign",
"description": "Complete website overhaul",
"url": "/projects/1",
"badge": "Project"
},
{
"type": "task",
"category": "task",
"id": 5,
"title": "Update homepage",
"description": "Website Redesign",
"url": "/tasks/5",
"badge": "In Progress"
}
],
"query": "website",
"count": 2
}
```
**Search Behavior:**
- **Projects**: Searches in name and description (active projects only)
- **Tasks**: Searches in name and description (tasks from active projects only)
- **Clients**: Searches in name, email, and company
- **Time Entries**: Searches in notes and tags (non-admin users see only their own entries)
**Error Responses:**
- `400 Bad Request` - Query is too short (less than 2 characters)
- `401 Unauthorized` - Missing or invalid API token
- `403 Forbidden` - Token lacks `read:projects` scope
**Note:** The legacy endpoint `/api/search` is also available for session-based authentication (requires login).
### Projects
#### List Projects

View File

@@ -0,0 +1,236 @@
"""
Tests for search API endpoints.
Tests both /api/search (legacy) and /api/v1/search (versioned API).
"""
import pytest
from app.models import Project, Task, Client, TimeEntry, ApiToken
class TestLegacySearchAPI:
"""Tests for legacy /api/search endpoint (session-based auth)"""
def test_search_with_valid_query(self, authenticated_client, project):
"""Test search with valid query"""
response = authenticated_client.get("/api/search", query_string={"q": "test"})
assert response.status_code == 200
data = response.get_json()
assert "results" in data
assert "query" in data
assert "count" in data
assert isinstance(data["results"], list)
def test_search_with_short_query(self, authenticated_client):
"""Test search with query that's too short"""
response = authenticated_client.get("/api/search", query_string={"q": "a"})
assert response.status_code == 200
data = response.get_json()
assert data["results"] == []
assert data["count"] == 0
def test_search_with_empty_query(self, authenticated_client):
"""Test search with empty query"""
response = authenticated_client.get("/api/search", query_string={"q": ""})
assert response.status_code == 200
data = response.get_json()
assert data["results"] == []
def test_search_with_limit(self, authenticated_client, project):
"""Test search with custom limit"""
response = authenticated_client.get("/api/search", query_string={"q": "test", "limit": 5})
assert response.status_code == 200
data = response.get_json()
assert len(data["results"]) <= 5
def test_search_with_types_filter(self, authenticated_client, project):
"""Test search with types filter"""
response = authenticated_client.get("/api/search", query_string={"q": "test", "types": "project"})
assert response.status_code == 200
data = response.get_json()
# All results should be projects
for result in data["results"]:
assert result["type"] == "project"
def test_search_projects(self, authenticated_client, project):
"""Test searching for projects"""
response = authenticated_client.get("/api/search", query_string={"q": project.name[:3]})
assert response.status_code == 200
data = response.get_json()
project_results = [r for r in data["results"] if r["type"] == "project"]
assert len(project_results) > 0
assert any(r["id"] == project.id for r in project_results)
def test_search_requires_authentication(self, client):
"""Test that search requires authentication"""
response = client.get("/api/search", query_string={"q": "test"})
# Should redirect to login
assert response.status_code in [302, 401]
class TestV1SearchAPI:
"""Tests for /api/v1/search endpoint (token-based auth)"""
@pytest.fixture
def api_token(self, app, user):
"""Create an API token for testing"""
token, plain_token = ApiToken.create_token(
user_id=user.id, name="Test API Token", scopes="read:projects"
)
from app import db
db.session.add(token)
db.session.commit()
return token, plain_token
@pytest.fixture
def api_client(self, app, api_token):
"""Create a test client with API token"""
token, plain_token = api_token
test_client = app.test_client()
test_client.environ_base["HTTP_AUTHORIZATION"] = f"Bearer {plain_token}"
return test_client
def test_search_with_valid_query(self, api_client, project):
"""Test search with valid query"""
response = api_client.get("/api/v1/search", query_string={"q": "test"})
assert response.status_code == 200
data = response.get_json()
assert "results" in data
assert "query" in data
assert "count" in data
assert isinstance(data["results"], list)
def test_search_with_short_query(self, api_client):
"""Test search with query that's too short"""
response = api_client.get("/api/v1/search", query_string={"q": "a"})
assert response.status_code == 400
data = response.get_json()
assert "error" in data
assert "results" in data
def test_search_with_empty_query(self, api_client):
"""Test search with empty query"""
response = api_client.get("/api/v1/search", query_string={"q": ""})
assert response.status_code == 400
def test_search_requires_authentication(self, app):
"""Test that search requires authentication"""
test_client = app.test_client()
response = test_client.get("/api/v1/search", query_string={"q": "test"})
assert response.status_code == 401
data = response.get_json()
assert "error" in data
def test_search_requires_read_projects_scope(self, app, user):
"""Test that search requires read:projects scope"""
# Create token without read:projects scope
token, plain_token = ApiToken.create_token(
user_id=user.id, name="Test Token", scopes="read:time_entries"
)
from app import db
db.session.add(token)
db.session.commit()
test_client = app.test_client()
test_client.environ_base["HTTP_AUTHORIZATION"] = f"Bearer {plain_token}"
response = test_client.get("/api/v1/search", query_string={"q": "test"})
assert response.status_code == 403
data = response.get_json()
assert "error" in data
assert "Insufficient permissions" in data["error"]
def test_search_with_limit(self, api_client, project):
"""Test search with custom limit"""
response = api_client.get("/api/v1/search", query_string={"q": "test", "limit": 5})
assert response.status_code == 200
data = response.get_json()
# Should respect limit per category, so total might be higher
assert isinstance(data["results"], list)
def test_search_with_types_filter(self, api_client, project):
"""Test search with types filter"""
response = api_client.get("/api/v1/search", query_string={"q": "test", "types": "project"})
assert response.status_code == 200
data = response.get_json()
# All results should be projects
for result in data["results"]:
assert result["type"] == "project"
def test_search_projects(self, api_client, project):
"""Test searching for projects"""
response = api_client.get("/api/v1/search", query_string={"q": project.name[:3]})
assert response.status_code == 200
data = response.get_json()
project_results = [r for r in data["results"] if r["type"] == "project"]
assert len(project_results) > 0
assert any(r["id"] == project.id for r in project_results)
def test_search_time_entries_respects_user_permissions(self, app, user, project):
"""Test that non-admin users only see their own time entries"""
from app import db
from datetime import datetime, timedelta
# Create API token for user
token, plain_token = ApiToken.create_token(
user_id=user.id, name="Test Token", scopes="read:projects"
)
db.session.add(token)
# Create time entry for this user
entry = TimeEntry(
user_id=user.id,
project_id=project.id,
start_time=datetime.now() - timedelta(hours=1),
end_time=datetime.now(),
notes="Test search entry",
)
db.session.add(entry)
db.session.commit()
test_client = app.test_client()
test_client.environ_base["HTTP_AUTHORIZATION"] = f"Bearer {plain_token}"
response = test_client.get("/api/v1/search", query_string={"q": "Test search", "types": "entry"})
assert response.status_code == 200
data = response.get_json()
# Should find the user's own entry
entry_results = [r for r in data["results"] if r["type"] == "entry"]
assert any(r["id"] == entry.id for r in entry_results)
def test_search_clients(self, api_client, client):
"""Test searching for clients"""
response = api_client.get("/api/v1/search", query_string={"q": client.name[:3]})
assert response.status_code == 200
data = response.get_json()
client_results = [r for r in data["results"] if r["type"] == "client"]
assert len(client_results) > 0
assert any(r["id"] == client.id for r in client_results)
def test_search_tasks(self, api_client, task):
"""Test searching for tasks"""
response = api_client.get("/api/v1/search", query_string={"q": task.name[:3]})
assert response.status_code == 200
data = response.get_json()
task_results = [r for r in data["results"] if r["type"] == "task"]
assert len(task_results) > 0
assert any(r["id"] == task.id for r in task_results)