diff --git a/src/ocr/queue.rs b/src/ocr/queue.rs index d0b3d8e..10e5e8a 100644 --- a/src/ocr/queue.rs +++ b/src/ocr/queue.rs @@ -1,7 +1,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, PgPool, Row}; +use sqlx::{FromRow, PgPool, Row, Column}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::Semaphore; @@ -793,21 +793,104 @@ impl OcrQueueService { /// Get queue statistics pub async fn get_stats(&self) -> Result { + tracing::debug!("OCR Queue: Starting get_stats() call"); + let stats = sqlx::query( r#" SELECT * FROM get_ocr_queue_stats() "# ) .fetch_one(&self.pool) - .await?; + .await + .map_err(|e| { + tracing::error!("OCR Queue: Failed to execute get_ocr_queue_stats(): {}", e); + e + })?; + + tracing::debug!("OCR Queue: Successfully fetched stats row"); + + // Debug: Print all column names and their types + let columns = stats.columns(); + for (i, column) in columns.iter().enumerate() { + tracing::debug!("OCR Queue: Column {}: name='{}', type='{:?}'", i, column.name(), column.type_info()); + } + + // Try to extract each field with detailed error handling + let pending_count = match stats.try_get::("pending_count") { + Ok(val) => { + tracing::debug!("OCR Queue: pending_count = {}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get pending_count: {}", e); + return Err(anyhow::anyhow!("Failed to get pending_count: {}", e)); + } + }; + + let processing_count = match stats.try_get::("processing_count") { + Ok(val) => { + tracing::debug!("OCR Queue: processing_count = {}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get processing_count: {}", e); + return Err(anyhow::anyhow!("Failed to get processing_count: {}", e)); + } + }; + + let failed_count = match stats.try_get::("failed_count") { + Ok(val) => { + tracing::debug!("OCR Queue: failed_count = {}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get failed_count: {}", e); + return Err(anyhow::anyhow!("Failed to get failed_count: {}", e)); + } + }; + + let completed_today = match stats.try_get::("completed_today") { + Ok(val) => { + tracing::debug!("OCR Queue: completed_today = {}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get completed_today: {}", e); + return Err(anyhow::anyhow!("Failed to get completed_today: {}", e)); + } + }; + + let avg_wait_time_minutes = match stats.try_get::, _>("avg_wait_time_minutes") { + Ok(val) => { + tracing::debug!("OCR Queue: avg_wait_time_minutes = {:?}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get avg_wait_time_minutes: {}", e); + return Err(anyhow::anyhow!("Failed to get avg_wait_time_minutes: {}", e)); + } + }; + + let oldest_pending_minutes = match stats.try_get::, _>("oldest_pending_minutes") { + Ok(val) => { + tracing::debug!("OCR Queue: oldest_pending_minutes = {:?}", val); + val + } + Err(e) => { + tracing::error!("OCR Queue: Failed to get oldest_pending_minutes: {}", e); + return Err(anyhow::anyhow!("Failed to get oldest_pending_minutes: {}", e)); + } + }; + + tracing::debug!("OCR Queue: Successfully extracted all stats fields"); Ok(QueueStats { - pending_count: stats.get::, _>("pending_count").unwrap_or(0), - processing_count: stats.get::, _>("processing_count").unwrap_or(0), - failed_count: stats.get::, _>("failed_count").unwrap_or(0), - completed_today: stats.get::, _>("completed_today").unwrap_or(0), - avg_wait_time_minutes: stats.get("avg_wait_time_minutes"), - oldest_pending_minutes: stats.get("oldest_pending_minutes"), + pending_count, + processing_count, + failed_count, + completed_today, + avg_wait_time_minutes, + oldest_pending_minutes, }) } diff --git a/src/routes/documents/mod.rs b/src/routes/documents/mod.rs index b6c5eba..2dcda4b 100644 --- a/src/routes/documents/mod.rs +++ b/src/routes/documents/mod.rs @@ -33,6 +33,12 @@ pub fn router() -> Router> { .route("/ocr/stats", get(get_ocr_stats)) .route("/{id}/ocr/stop", post(cancel_ocr)) + // OCR retry operations + .route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats)) + .route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations)) + .route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr)) + .route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history)) + // Bulk operations .route("/bulk/delete", post(bulk_delete_documents)) .route("/cleanup/low-confidence", delete(delete_low_confidence_documents)) diff --git a/src/routes/prometheus_metrics.rs b/src/routes/prometheus_metrics.rs index ed05260..cbe976a 100644 --- a/src/routes/prometheus_metrics.rs +++ b/src/routes/prometheus_metrics.rs @@ -29,11 +29,15 @@ pub fn router() -> Router> { pub async fn get_prometheus_metrics( State(state): State>, ) -> Result { + tracing::debug!("Prometheus: get_prometheus_metrics endpoint called"); + let mut output = String::new(); // Get current timestamp let timestamp = chrono::Utc::now().timestamp_millis(); + tracing::debug!("Prometheus: Starting to collect all metrics"); + // Collect all metrics let (document_metrics, ocr_metrics, user_metrics, database_metrics, system_metrics, storage_metrics, security_metrics) = tokio::try_join!( collect_document_metrics(&state), @@ -43,7 +47,12 @@ pub async fn get_prometheus_metrics( collect_system_metrics(&state), collect_storage_metrics(&state), collect_security_metrics(&state) - )?; + ).map_err(|e| { + tracing::error!("Prometheus: Failed to collect metrics: {:?}", e); + e + })?; + + tracing::debug!("Prometheus: Successfully collected all metrics, formatting output"); // Write Prometheus formatted metrics @@ -299,20 +308,27 @@ async fn collect_document_metrics(state: &Arc) -> Result) -> Result { use crate::ocr::queue::OcrQueueService; + tracing::debug!("Prometheus: Starting collect_ocr_metrics"); + let queue_service = OcrQueueService::new( state.db.clone(), state.db.pool.clone(), state.config.concurrent_ocr_jobs ); + tracing::debug!("Prometheus: Created OCR queue service, calling get_stats()"); + let stats = queue_service .get_stats() .await .map_err(|e| { - tracing::error!("Failed to get OCR stats: {}", e); + tracing::error!("Prometheus: Failed to get OCR stats: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; + tracing::debug!("Prometheus: Successfully got OCR stats: pending={}, processing={}, failed={}, completed_today={}", + stats.pending_count, stats.processing_count, stats.failed_count, stats.completed_today); + // Get additional OCR metrics let stuck_jobs = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM documents WHERE ocr_status = 'processing' AND updated_at < NOW() - INTERVAL '30 minutes'" diff --git a/src/test_utils.rs b/src/test_utils.rs index a8a2551..82836f2 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -14,6 +14,8 @@ use axum::Router; #[cfg(any(test, feature = "test-utils"))] use serde_json::json; #[cfg(any(test, feature = "test-utils"))] +use uuid; +#[cfg(any(test, feature = "test-utils"))] use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt}; #[cfg(any(test, feature = "test-utils"))] use testcontainers_modules::postgres::Postgres; @@ -323,12 +325,15 @@ impl TestAuthHelper { /// Create a regular test user with unique credentials pub async fn create_test_user(&self) -> TestUser { - let test_id = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - .to_string()[..8] - .to_string(); + // Generate a more unique ID using process ID, thread ID (as debug string), and nanoseconds + let test_id = format!("{}_{}_{}", + std::process::id(), + format!("{:?}", std::thread::current().id()).replace("ThreadId(", "").replace(")", ""), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + ); let username = format!("testuser_{}", test_id); let email = format!("test_{}@example.com", test_id); let password = "password123"; @@ -350,7 +355,51 @@ impl TestAuthHelper { let user_response = match serde_json::from_slice::(&response) { Ok(json_value) => { println!("DEBUG: Parsed JSON structure: {:#}", json_value); - // Now try to parse as UserResponse + + // Check if this is an error response due to username collision + if let Some(error_msg) = json_value.get("error").and_then(|e| e.as_str()) { + if error_msg.contains("Username already exists") { + println!("DEBUG: Username collision detected, retrying with UUID suffix"); + // Retry with a UUID suffix for guaranteed uniqueness + let retry_username = format!("{}_{}", + username, + uuid::Uuid::new_v4().to_string().replace('-', "")[..8].to_string() + ); + let retry_email = format!("test_{}@example.com", + uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string() + ); + + let retry_user_data = json!({ + "username": retry_username, + "email": retry_email, + "password": password + }); + + let retry_response = self.make_request("POST", "/api/auth/register", Some(retry_user_data), None).await; + let retry_response_str = String::from_utf8_lossy(&retry_response); + println!("DEBUG: Retry register response body: {}", retry_response_str); + + let retry_json_value = serde_json::from_slice::(&retry_response) + .expect("Retry response should be valid JSON"); + + match serde_json::from_value::(retry_json_value) { + Ok(user_response) => { + return TestUser { + user_response, + username: retry_username, + password: password.to_string(), + token: None, + }; + }, + Err(e) => { + eprintln!("ERROR: Failed to parse UserResponse from retry JSON: {}", e); + panic!("Failed to parse UserResponse from retry: {}", e); + } + } + } + } + + // Try to parse as UserResponse match serde_json::from_value::(json_value) { Ok(user_response) => user_response, Err(e) => { @@ -377,12 +426,15 @@ impl TestAuthHelper { /// Create an admin test user with unique credentials pub async fn create_admin_user(&self) -> TestUser { - let test_id = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - .to_string()[..8] - .to_string(); + // Generate a more unique ID using process ID, thread ID (as debug string), and nanoseconds + let test_id = format!("{}_{}_{}", + std::process::id(), + format!("{:?}", std::thread::current().id()).replace("ThreadId(", "").replace(")", ""), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + ); let username = format!("adminuser_{}", test_id); let email = format!("admin_{}@example.com", test_id); let password = "adminpass123"; @@ -405,7 +457,52 @@ impl TestAuthHelper { let user_response = match serde_json::from_slice::(&response) { Ok(json_value) => { println!("DEBUG: Admin parsed JSON structure: {:#}", json_value); - // Now try to parse as UserResponse + + // Check if this is an error response due to username collision + if let Some(error_msg) = json_value.get("error").and_then(|e| e.as_str()) { + if error_msg.contains("Username already exists") { + println!("DEBUG: Admin username collision detected, retrying with UUID suffix"); + // Retry with a UUID suffix for guaranteed uniqueness + let retry_username = format!("{}_{}", + username, + uuid::Uuid::new_v4().to_string().replace('-', "")[..8].to_string() + ); + let retry_email = format!("admin_{}@example.com", + uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string() + ); + + let retry_admin_data = json!({ + "username": retry_username, + "email": retry_email, + "password": password, + "role": "admin" + }); + + let retry_response = self.make_request("POST", "/api/auth/register", Some(retry_admin_data), None).await; + let retry_response_str = String::from_utf8_lossy(&retry_response); + println!("DEBUG: Retry admin register response body: {}", retry_response_str); + + let retry_json_value = serde_json::from_slice::(&retry_response) + .expect("Retry admin response should be valid JSON"); + + match serde_json::from_value::(retry_json_value) { + Ok(user_response) => { + return TestUser { + user_response, + username: retry_username, + password: password.to_string(), + token: None, + }; + }, + Err(e) => { + eprintln!("ERROR: Failed to parse UserResponse from retry admin JSON: {}", e); + panic!("Failed to parse UserResponse from retry admin: {}", e); + } + } + } + } + + // Try to parse as UserResponse match serde_json::from_value::(json_value) { Ok(user_response) => user_response, Err(e) => { diff --git a/tests/integration_file_processing_pipeline_tests.rs b/tests/integration_file_processing_pipeline_tests.rs index 620b53f..2337ff5 100644 --- a/tests/integration_file_processing_pipeline_tests.rs +++ b/tests/integration_file_processing_pipeline_tests.rs @@ -216,10 +216,7 @@ impl FileProcessingTestClient { .await?; if response.status().is_success() { - let response_json: serde_json::Value = response.json().await?; - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - )?; + let documents: Vec = response.json().await?; if let Some(doc) = documents.iter().find(|d| d.id.to_string() == document_id) { println!("📄 DEBUG: Found document with OCR status: {:?}", doc.ocr_status); @@ -582,11 +579,8 @@ async fn test_image_processing_pipeline() { .await .expect("Failed to get documents"); - let response_json: serde_json::Value = response.json().await + let documents: Vec = response.json().await .expect("Failed to parse response"); - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - ).expect("Failed to parse documents"); documents.into_iter() .find(|d| d.id.to_string() == document_id) @@ -702,26 +696,22 @@ async fn test_processing_error_recovery() { .await; if let Ok(resp) = response { - if let Ok(response_json) = resp.json::().await { - if let Ok(docs) = serde_json::from_value::>( - response_json["documents"].clone() - ) { - if let Some(doc) = docs.iter().find(|d| d.id.to_string() == document.document_id.to_string()) { - match doc.ocr_status.as_deref() { - Some("completed") => { - println!("✅ Large file processing completed"); - break; + if let Ok(docs) = resp.json::>().await { + if let Some(doc) = docs.iter().find(|d| d.id.to_string() == document.document_id.to_string()) { + match doc.ocr_status.as_deref() { + Some("completed") => { + println!("✅ Large file processing completed"); + break; + } + Some("failed") => { + println!("â„šī¸ Large file processing failed (may be expected for very large files)"); + break; + } + _ => { + sleep(Duration::from_secs(2)).await; + continue; + } } - Some("failed") => { - println!("â„šī¸ Large file processing failed (may be expected for very large files)"); - break; - } - _ => { - sleep(Duration::from_secs(2)).await; - continue; - } - } - } } } } @@ -997,11 +987,8 @@ async fn test_concurrent_file_processing() { .expect("Should get documents"); if response.status().is_success() { - let response_json: serde_json::Value = response.json().await + let documents: Vec = response.json().await .expect("Should parse response"); - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - ).expect("Should parse documents"); if let Some(doc) = documents.iter().find(|d| d.id.to_string() == document_id) { match doc.ocr_status.as_deref() { diff --git a/tests/integration_investigate_empty_content.rs b/tests/integration_investigate_empty_content.rs index 8b07ffb..6e611de 100644 --- a/tests/integration_investigate_empty_content.rs +++ b/tests/integration_investigate_empty_content.rs @@ -6,10 +6,10 @@ use reqwest::Client; use serde_json::Value; use std::time::{Duration, Instant}; use tokio::time::sleep; -use uuid::Uuid; use futures; -use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse}; +use readur::models::{CreateUser, LoginRequest, LoginResponse}; +use readur::routes::documents::types::DocumentUploadResponse; fn get_base_url() -> String { std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string()) @@ -63,7 +63,7 @@ impl Investigator { Self { client, token } } - async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse { + async fn upload_document(&self, content: &str, filename: &str) -> DocumentUploadResponse { let part = reqwest::multipart::Part::text(content.to_string()) .file_name(filename.to_string()) .mime_str("text/plain") @@ -160,7 +160,7 @@ async fn investigate_empty_content_issue() { let mut sample_results = Vec::new(); for (i, doc) in uploaded_docs.iter().enumerate().take(3) { // Sample first 3 docs - let details = investigator.get_document_details(&doc.id.to_string()).await; + let details = investigator.get_document_details(&doc.document_id.to_string()).await; let status = details["ocr_status"].as_str().unwrap_or("unknown"); let ocr_text = details["ocr_text"].as_str().unwrap_or(""); let expected = &documents[i].0; @@ -169,7 +169,7 @@ async fn investigate_empty_content_issue() { current_completed += 1; } - sample_results.push((doc.id.to_string(), status.to_string(), expected.clone(), ocr_text.to_string())); + sample_results.push((doc.document_id.to_string(), status.to_string(), expected.clone(), ocr_text.to_string())); } // Estimate total completed (this is rough but gives us an idea) @@ -207,7 +207,7 @@ async fn investigate_empty_content_issue() { let mut other_corruption = 0; for (i, doc) in uploaded_docs.iter().enumerate() { - let details = investigator.get_document_details(&doc.id.to_string()).await; + let details = investigator.get_document_details(&doc.document_id.to_string()).await; let status = details["ocr_status"].as_str().unwrap_or("unknown"); let ocr_text = details["ocr_text"].as_str().unwrap_or(""); let expected = &documents[i].0; diff --git a/tests/integration_labels_integration_tests.rs b/tests/integration_labels_integration_tests.rs index 1394c2e..6f68738 100644 --- a/tests/integration_labels_integration_tests.rs +++ b/tests/integration_labels_integration_tests.rs @@ -397,7 +397,7 @@ async fn test_document_label_assignment() -> Result<(), Box Result<(), Box> { println!("Testing system label assignment..."); let document_content = b"This is a test document for system label assignment."; let document = client.upload_document(document_content, "system_label_test.txt").await?; - let document_id = document["id"].as_str().unwrap(); + let document_id = document["document_id"].as_str().unwrap(); // Find a system label to assign let important_label = system_labels.iter() diff --git a/tests/integration_labels_tests.rs b/tests/integration_labels_tests.rs index 34576e5..c761346 100644 --- a/tests/integration_labels_tests.rs +++ b/tests/integration_labels_tests.rs @@ -210,7 +210,8 @@ mod tests { let auth_helper = TestAuthHelper::new(ctx.app.clone()); let user = auth_helper.create_test_user().await; - // Create system label + // Create system label with unique name + let unique_label_name = format!("System Label {}", uuid::Uuid::new_v4()); let label_id = sqlx::query_scalar::<_, uuid::Uuid>( r#" INSERT INTO labels (user_id, name, color, is_system) @@ -219,7 +220,7 @@ mod tests { "#, ) .bind(None::) // System labels have NULL user_id - .bind("System Label") + .bind(&unique_label_name) .bind("#ff0000") .bind(true) .fetch_one(&ctx.state.db.pool) @@ -247,6 +248,13 @@ mod tests { .await; assert!(system_label.is_ok()); + + // Cleanup: Remove the test system label + sqlx::query("DELETE FROM labels WHERE id = $1") + .bind(label_id) + .execute(&ctx.state.db.pool) + .await + .expect("Failed to cleanup test system label"); } #[tokio::test] diff --git a/tests/integration_ocr_queue_management_tests.rs b/tests/integration_ocr_queue_management_tests.rs index a20e942..98f99fc 100644 --- a/tests/integration_ocr_queue_management_tests.rs +++ b/tests/integration_ocr_queue_management_tests.rs @@ -227,10 +227,7 @@ impl OCRQueueTestClient { .await?; if response.status().is_success() { - let response_json: serde_json::Value = response.json().await?; - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - )?; + let documents: Vec = response.json().await?; for (i, doc_id) in document_ids.iter().enumerate() { if !completed_status[i] { @@ -278,10 +275,7 @@ impl OCRQueueTestClient { return Err(format!("Get documents failed: {}", response.text().await?).into()); } - let response_json: serde_json::Value = response.json().await?; - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - )?; + let documents: Vec = response.json().await?; Ok(documents) } } diff --git a/tests/integration_ocr_retry_tests.rs b/tests/integration_ocr_retry_tests.rs index 17ab218..007eb18 100644 --- a/tests/integration_ocr_retry_tests.rs +++ b/tests/integration_ocr_retry_tests.rs @@ -247,7 +247,7 @@ impl OcrRetryTestHelper { } let upload_result: Value = response.json().await?; - let doc_id = upload_result["id"].as_str() + let doc_id = upload_result["document_id"].as_str() .ok_or("No document ID in upload response")? .to_string(); diff --git a/tests/integration_performance_load_tests.rs b/tests/integration_performance_load_tests.rs index 45bda62..1588e59 100644 --- a/tests/integration_performance_load_tests.rs +++ b/tests/integration_performance_load_tests.rs @@ -239,11 +239,8 @@ impl LoadTestClient { return Err(format!("List documents failed: {}", response.text().await?).into()); } - let response_json: serde_json::Value = response.json().await?; - let documents = response_json["documents"].as_array() - .ok_or("Invalid response format: missing documents array")? - .clone(); - Ok((documents, elapsed)) + let documents_array: Vec = response.json().await?; + Ok((documents_array, elapsed)) } /// Perform a timed search request diff --git a/tests/integration_role_based_access_control_tests.rs b/tests/integration_role_based_access_control_tests.rs index 942461f..04859cf 100644 --- a/tests/integration_role_based_access_control_tests.rs +++ b/tests/integration_role_based_access_control_tests.rs @@ -223,10 +223,7 @@ impl RBACTestClient { return Err(format!("Get documents failed: {}", response.text().await?).into()); } - let response_json: Value = response.json().await?; - let documents: Vec = serde_json::from_value( - response_json["documents"].clone() - )?; + let documents: Vec = response.json().await?; Ok(documents) }