fix(tests): and resolve missing endpoint

This commit is contained in:
perf3ct
2025-07-08 04:37:33 +00:00
parent faf3299119
commit 58b8a71404
12 changed files with 268 additions and 83 deletions

View File

@@ -1,7 +1,7 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, PgPool, Row};
use sqlx::{FromRow, PgPool, Row, Column};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Semaphore;
@@ -793,21 +793,104 @@ impl OcrQueueService {
/// Get queue statistics
pub async fn get_stats(&self) -> Result<QueueStats> {
tracing::debug!("OCR Queue: Starting get_stats() call");
let stats = sqlx::query(
r#"
SELECT * FROM get_ocr_queue_stats()
"#
)
.fetch_one(&self.pool)
.await?;
.await
.map_err(|e| {
tracing::error!("OCR Queue: Failed to execute get_ocr_queue_stats(): {}", e);
e
})?;
tracing::debug!("OCR Queue: Successfully fetched stats row");
// Debug: Print all column names and their types
let columns = stats.columns();
for (i, column) in columns.iter().enumerate() {
tracing::debug!("OCR Queue: Column {}: name='{}', type='{:?}'", i, column.name(), column.type_info());
}
// Try to extract each field with detailed error handling
let pending_count = match stats.try_get::<i64, _>("pending_count") {
Ok(val) => {
tracing::debug!("OCR Queue: pending_count = {}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get pending_count: {}", e);
return Err(anyhow::anyhow!("Failed to get pending_count: {}", e));
}
};
let processing_count = match stats.try_get::<i64, _>("processing_count") {
Ok(val) => {
tracing::debug!("OCR Queue: processing_count = {}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get processing_count: {}", e);
return Err(anyhow::anyhow!("Failed to get processing_count: {}", e));
}
};
let failed_count = match stats.try_get::<i64, _>("failed_count") {
Ok(val) => {
tracing::debug!("OCR Queue: failed_count = {}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get failed_count: {}", e);
return Err(anyhow::anyhow!("Failed to get failed_count: {}", e));
}
};
let completed_today = match stats.try_get::<i64, _>("completed_today") {
Ok(val) => {
tracing::debug!("OCR Queue: completed_today = {}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get completed_today: {}", e);
return Err(anyhow::anyhow!("Failed to get completed_today: {}", e));
}
};
let avg_wait_time_minutes = match stats.try_get::<Option<f64>, _>("avg_wait_time_minutes") {
Ok(val) => {
tracing::debug!("OCR Queue: avg_wait_time_minutes = {:?}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get avg_wait_time_minutes: {}", e);
return Err(anyhow::anyhow!("Failed to get avg_wait_time_minutes: {}", e));
}
};
let oldest_pending_minutes = match stats.try_get::<Option<f64>, _>("oldest_pending_minutes") {
Ok(val) => {
tracing::debug!("OCR Queue: oldest_pending_minutes = {:?}", val);
val
}
Err(e) => {
tracing::error!("OCR Queue: Failed to get oldest_pending_minutes: {}", e);
return Err(anyhow::anyhow!("Failed to get oldest_pending_minutes: {}", e));
}
};
tracing::debug!("OCR Queue: Successfully extracted all stats fields");
Ok(QueueStats {
pending_count: stats.get::<Option<i64>, _>("pending_count").unwrap_or(0),
processing_count: stats.get::<Option<i64>, _>("processing_count").unwrap_or(0),
failed_count: stats.get::<Option<i64>, _>("failed_count").unwrap_or(0),
completed_today: stats.get::<Option<i64>, _>("completed_today").unwrap_or(0),
avg_wait_time_minutes: stats.get("avg_wait_time_minutes"),
oldest_pending_minutes: stats.get("oldest_pending_minutes"),
pending_count,
processing_count,
failed_count,
completed_today,
avg_wait_time_minutes,
oldest_pending_minutes,
})
}

View File

@@ -33,6 +33,12 @@ pub fn router() -> Router<Arc<AppState>> {
.route("/ocr/stats", get(get_ocr_stats))
.route("/{id}/ocr/stop", post(cancel_ocr))
// OCR retry operations
.route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats))
.route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations))
.route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr))
.route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history))
// Bulk operations
.route("/bulk/delete", post(bulk_delete_documents))
.route("/cleanup/low-confidence", delete(delete_low_confidence_documents))

View File

@@ -29,11 +29,15 @@ pub fn router() -> Router<Arc<AppState>> {
pub async fn get_prometheus_metrics(
State(state): State<Arc<AppState>>,
) -> Result<Response, StatusCode> {
tracing::debug!("Prometheus: get_prometheus_metrics endpoint called");
let mut output = String::new();
// Get current timestamp
let timestamp = chrono::Utc::now().timestamp_millis();
tracing::debug!("Prometheus: Starting to collect all metrics");
// Collect all metrics
let (document_metrics, ocr_metrics, user_metrics, database_metrics, system_metrics, storage_metrics, security_metrics) = tokio::try_join!(
collect_document_metrics(&state),
@@ -43,7 +47,12 @@ pub async fn get_prometheus_metrics(
collect_system_metrics(&state),
collect_storage_metrics(&state),
collect_security_metrics(&state)
)?;
).map_err(|e| {
tracing::error!("Prometheus: Failed to collect metrics: {:?}", e);
e
})?;
tracing::debug!("Prometheus: Successfully collected all metrics, formatting output");
// Write Prometheus formatted metrics
@@ -299,20 +308,27 @@ async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetri
async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> {
use crate::ocr::queue::OcrQueueService;
tracing::debug!("Prometheus: Starting collect_ocr_metrics");
let queue_service = OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
state.config.concurrent_ocr_jobs
);
tracing::debug!("Prometheus: Created OCR queue service, calling get_stats()");
let stats = queue_service
.get_stats()
.await
.map_err(|e| {
tracing::error!("Failed to get OCR stats: {}", e);
tracing::error!("Prometheus: Failed to get OCR stats: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::debug!("Prometheus: Successfully got OCR stats: pending={}, processing={}, failed={}, completed_today={}",
stats.pending_count, stats.processing_count, stats.failed_count, stats.completed_today);
// Get additional OCR metrics
let stuck_jobs = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'processing' AND updated_at < NOW() - INTERVAL '30 minutes'"

View File

@@ -14,6 +14,8 @@ use axum::Router;
#[cfg(any(test, feature = "test-utils"))]
use serde_json::json;
#[cfg(any(test, feature = "test-utils"))]
use uuid;
#[cfg(any(test, feature = "test-utils"))]
use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
#[cfg(any(test, feature = "test-utils"))]
use testcontainers_modules::postgres::Postgres;
@@ -323,12 +325,15 @@ impl TestAuthHelper {
/// Create a regular test user with unique credentials
pub async fn create_test_user(&self) -> TestUser {
let test_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_string()[..8]
.to_string();
// Generate a more unique ID using process ID, thread ID (as debug string), and nanoseconds
let test_id = format!("{}_{}_{}",
std::process::id(),
format!("{:?}", std::thread::current().id()).replace("ThreadId(", "").replace(")", ""),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let username = format!("testuser_{}", test_id);
let email = format!("test_{}@example.com", test_id);
let password = "password123";
@@ -350,7 +355,51 @@ impl TestAuthHelper {
let user_response = match serde_json::from_slice::<serde_json::Value>(&response) {
Ok(json_value) => {
println!("DEBUG: Parsed JSON structure: {:#}", json_value);
// Now try to parse as UserResponse
// Check if this is an error response due to username collision
if let Some(error_msg) = json_value.get("error").and_then(|e| e.as_str()) {
if error_msg.contains("Username already exists") {
println!("DEBUG: Username collision detected, retrying with UUID suffix");
// Retry with a UUID suffix for guaranteed uniqueness
let retry_username = format!("{}_{}",
username,
uuid::Uuid::new_v4().to_string().replace('-', "")[..8].to_string()
);
let retry_email = format!("test_{}@example.com",
uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string()
);
let retry_user_data = json!({
"username": retry_username,
"email": retry_email,
"password": password
});
let retry_response = self.make_request("POST", "/api/auth/register", Some(retry_user_data), None).await;
let retry_response_str = String::from_utf8_lossy(&retry_response);
println!("DEBUG: Retry register response body: {}", retry_response_str);
let retry_json_value = serde_json::from_slice::<serde_json::Value>(&retry_response)
.expect("Retry response should be valid JSON");
match serde_json::from_value::<UserResponse>(retry_json_value) {
Ok(user_response) => {
return TestUser {
user_response,
username: retry_username,
password: password.to_string(),
token: None,
};
},
Err(e) => {
eprintln!("ERROR: Failed to parse UserResponse from retry JSON: {}", e);
panic!("Failed to parse UserResponse from retry: {}", e);
}
}
}
}
// Try to parse as UserResponse
match serde_json::from_value::<UserResponse>(json_value) {
Ok(user_response) => user_response,
Err(e) => {
@@ -377,12 +426,15 @@ impl TestAuthHelper {
/// Create an admin test user with unique credentials
pub async fn create_admin_user(&self) -> TestUser {
let test_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_string()[..8]
.to_string();
// Generate a more unique ID using process ID, thread ID (as debug string), and nanoseconds
let test_id = format!("{}_{}_{}",
std::process::id(),
format!("{:?}", std::thread::current().id()).replace("ThreadId(", "").replace(")", ""),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let username = format!("adminuser_{}", test_id);
let email = format!("admin_{}@example.com", test_id);
let password = "adminpass123";
@@ -405,7 +457,52 @@ impl TestAuthHelper {
let user_response = match serde_json::from_slice::<serde_json::Value>(&response) {
Ok(json_value) => {
println!("DEBUG: Admin parsed JSON structure: {:#}", json_value);
// Now try to parse as UserResponse
// Check if this is an error response due to username collision
if let Some(error_msg) = json_value.get("error").and_then(|e| e.as_str()) {
if error_msg.contains("Username already exists") {
println!("DEBUG: Admin username collision detected, retrying with UUID suffix");
// Retry with a UUID suffix for guaranteed uniqueness
let retry_username = format!("{}_{}",
username,
uuid::Uuid::new_v4().to_string().replace('-', "")[..8].to_string()
);
let retry_email = format!("admin_{}@example.com",
uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string()
);
let retry_admin_data = json!({
"username": retry_username,
"email": retry_email,
"password": password,
"role": "admin"
});
let retry_response = self.make_request("POST", "/api/auth/register", Some(retry_admin_data), None).await;
let retry_response_str = String::from_utf8_lossy(&retry_response);
println!("DEBUG: Retry admin register response body: {}", retry_response_str);
let retry_json_value = serde_json::from_slice::<serde_json::Value>(&retry_response)
.expect("Retry admin response should be valid JSON");
match serde_json::from_value::<UserResponse>(retry_json_value) {
Ok(user_response) => {
return TestUser {
user_response,
username: retry_username,
password: password.to_string(),
token: None,
};
},
Err(e) => {
eprintln!("ERROR: Failed to parse UserResponse from retry admin JSON: {}", e);
panic!("Failed to parse UserResponse from retry admin: {}", e);
}
}
}
}
// Try to parse as UserResponse
match serde_json::from_value::<UserResponse>(json_value) {
Ok(user_response) => user_response,
Err(e) => {

View File

@@ -216,10 +216,7 @@ impl FileProcessingTestClient {
.await?;
if response.status().is_success() {
let response_json: serde_json::Value = response.json().await?;
let documents: Vec<DocumentResponse> = serde_json::from_value(
response_json["documents"].clone()
)?;
let documents: Vec<DocumentResponse> = response.json().await?;
if let Some(doc) = documents.iter().find(|d| d.id.to_string() == document_id) {
println!("📄 DEBUG: Found document with OCR status: {:?}", doc.ocr_status);
@@ -582,11 +579,8 @@ async fn test_image_processing_pipeline() {
.await
.expect("Failed to get documents");
let response_json: serde_json::Value = response.json().await
let documents: Vec<DocumentResponse> = response.json().await
.expect("Failed to parse response");
let documents: Vec<DocumentResponse> = serde_json::from_value(
response_json["documents"].clone()
).expect("Failed to parse documents");
documents.into_iter()
.find(|d| d.id.to_string() == document_id)
@@ -702,26 +696,22 @@ async fn test_processing_error_recovery() {
.await;
if let Ok(resp) = response {
if let Ok(response_json) = resp.json::<serde_json::Value>().await {
if let Ok(docs) = serde_json::from_value::<Vec<DocumentResponse>>(
response_json["documents"].clone()
) {
if let Some(doc) = docs.iter().find(|d| d.id.to_string() == document.document_id.to_string()) {
match doc.ocr_status.as_deref() {
Some("completed") => {
println!("✅ Large file processing completed");
break;
if let Ok(docs) = resp.json::<Vec<DocumentResponse>>().await {
if let Some(doc) = docs.iter().find(|d| d.id.to_string() == document.document_id.to_string()) {
match doc.ocr_status.as_deref() {
Some("completed") => {
println!("✅ Large file processing completed");
break;
}
Some("failed") => {
println!(" Large file processing failed (may be expected for very large files)");
break;
}
_ => {
sleep(Duration::from_secs(2)).await;
continue;
}
}
Some("failed") => {
println!(" Large file processing failed (may be expected for very large files)");
break;
}
_ => {
sleep(Duration::from_secs(2)).await;
continue;
}
}
}
}
}
}
@@ -997,11 +987,8 @@ async fn test_concurrent_file_processing() {
.expect("Should get documents");
if response.status().is_success() {
let response_json: serde_json::Value = response.json().await
let documents: Vec<DocumentResponse> = response.json().await
.expect("Should parse response");
let documents: Vec<DocumentResponse> = serde_json::from_value(
response_json["documents"].clone()
).expect("Should parse documents");
if let Some(doc) = documents.iter().find(|d| d.id.to_string() == document_id) {
match doc.ocr_status.as_deref() {

View File

@@ -6,10 +6,10 @@ use reqwest::Client;
use serde_json::Value;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use uuid::Uuid;
use futures;
use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse};
use readur::models::{CreateUser, LoginRequest, LoginResponse};
use readur::routes::documents::types::DocumentUploadResponse;
fn get_base_url() -> String {
std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string())
@@ -63,7 +63,7 @@ impl Investigator {
Self { client, token }
}
async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse {
async fn upload_document(&self, content: &str, filename: &str) -> DocumentUploadResponse {
let part = reqwest::multipart::Part::text(content.to_string())
.file_name(filename.to_string())
.mime_str("text/plain")
@@ -160,7 +160,7 @@ async fn investigate_empty_content_issue() {
let mut sample_results = Vec::new();
for (i, doc) in uploaded_docs.iter().enumerate().take(3) { // Sample first 3 docs
let details = investigator.get_document_details(&doc.id.to_string()).await;
let details = investigator.get_document_details(&doc.document_id.to_string()).await;
let status = details["ocr_status"].as_str().unwrap_or("unknown");
let ocr_text = details["ocr_text"].as_str().unwrap_or("");
let expected = &documents[i].0;
@@ -169,7 +169,7 @@ async fn investigate_empty_content_issue() {
current_completed += 1;
}
sample_results.push((doc.id.to_string(), status.to_string(), expected.clone(), ocr_text.to_string()));
sample_results.push((doc.document_id.to_string(), status.to_string(), expected.clone(), ocr_text.to_string()));
}
// Estimate total completed (this is rough but gives us an idea)
@@ -207,7 +207,7 @@ async fn investigate_empty_content_issue() {
let mut other_corruption = 0;
for (i, doc) in uploaded_docs.iter().enumerate() {
let details = investigator.get_document_details(&doc.id.to_string()).await;
let details = investigator.get_document_details(&doc.document_id.to_string()).await;
let status = details["ocr_status"].as_str().unwrap_or("unknown");
let ocr_text = details["ocr_text"].as_str().unwrap_or("");
let expected = &documents[i].0;

View File

@@ -397,7 +397,7 @@ async fn test_document_label_assignment() -> Result<(), Box<dyn std::error::Erro
println!("Uploading test document...");
let document_content = b"This is a test document for label assignment testing.";
let document = client.upload_document(document_content, "test_document.txt").await?;
let document_id = document["id"].as_str().unwrap();
let document_id = document["document_id"].as_str().unwrap();
// Test: Assign single label
println!("Testing single label assignment...");
@@ -501,7 +501,7 @@ async fn test_system_labels_access() -> Result<(), Box<dyn std::error::Error>> {
println!("Testing system label assignment...");
let document_content = b"This is a test document for system label assignment.";
let document = client.upload_document(document_content, "system_label_test.txt").await?;
let document_id = document["id"].as_str().unwrap();
let document_id = document["document_id"].as_str().unwrap();
// Find a system label to assign
let important_label = system_labels.iter()

View File

@@ -210,7 +210,8 @@ mod tests {
let auth_helper = TestAuthHelper::new(ctx.app.clone());
let user = auth_helper.create_test_user().await;
// Create system label
// Create system label with unique name
let unique_label_name = format!("System Label {}", uuid::Uuid::new_v4());
let label_id = sqlx::query_scalar::<_, uuid::Uuid>(
r#"
INSERT INTO labels (user_id, name, color, is_system)
@@ -219,7 +220,7 @@ mod tests {
"#,
)
.bind(None::<Uuid>) // System labels have NULL user_id
.bind("System Label")
.bind(&unique_label_name)
.bind("#ff0000")
.bind(true)
.fetch_one(&ctx.state.db.pool)
@@ -247,6 +248,13 @@ mod tests {
.await;
assert!(system_label.is_ok());
// Cleanup: Remove the test system label
sqlx::query("DELETE FROM labels WHERE id = $1")
.bind(label_id)
.execute(&ctx.state.db.pool)
.await
.expect("Failed to cleanup test system label");
}
#[tokio::test]

View File

@@ -227,10 +227,7 @@ impl OCRQueueTestClient {
.await?;
if response.status().is_success() {
let response_json: serde_json::Value = response.json().await?;
let documents: Vec<DocumentResponse> = serde_json::from_value(
response_json["documents"].clone()
)?;
let documents: Vec<DocumentResponse> = response.json().await?;
for (i, doc_id) in document_ids.iter().enumerate() {
if !completed_status[i] {
@@ -278,10 +275,7 @@ impl OCRQueueTestClient {
return Err(format!("Get documents failed: {}", response.text().await?).into());
}
let response_json: serde_json::Value = response.json().await?;
let documents: Vec<DocumentResponse> = serde_json::from_value(
response_json["documents"].clone()
)?;
let documents: Vec<DocumentResponse> = response.json().await?;
Ok(documents)
}
}

View File

@@ -247,7 +247,7 @@ impl OcrRetryTestHelper {
}
let upload_result: Value = response.json().await?;
let doc_id = upload_result["id"].as_str()
let doc_id = upload_result["document_id"].as_str()
.ok_or("No document ID in upload response")?
.to_string();

View File

@@ -239,11 +239,8 @@ impl LoadTestClient {
return Err(format!("List documents failed: {}", response.text().await?).into());
}
let response_json: serde_json::Value = response.json().await?;
let documents = response_json["documents"].as_array()
.ok_or("Invalid response format: missing documents array")?
.clone();
Ok((documents, elapsed))
let documents_array: Vec<serde_json::Value> = response.json().await?;
Ok((documents_array, elapsed))
}
/// Perform a timed search request

View File

@@ -223,10 +223,7 @@ impl RBACTestClient {
return Err(format!("Get documents failed: {}", response.text().await?).into());
}
let response_json: Value = response.json().await?;
let documents: Vec<Value> = serde_json::from_value(
response_json["documents"].clone()
)?;
let documents: Vec<Value> = response.json().await?;
Ok(documents)
}