use readur::test_utils::TestContext; use sqlx::{PgPool, Row}; use uuid::Uuid; use std::collections::HashMap; #[cfg(test)] mod comprehensive_migration_tests { use super::*; #[tokio::test] async fn test_migration_with_prefilled_data() { let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); // Step 1: Prefill the database with test data let test_data = prefill_test_data(pool).await; // Step 2: Verify the prefilled data exists verify_prefilled_data(pool, &test_data).await; // Step 3: Simulate and test the failed documents migration test_failed_documents_migration(pool, &test_data).await; // Step 4: Verify schema integrity after migration verify_schema_integrity(pool).await; // Step 5: Test data consistency after migration verify_data_consistency_after_migration(pool, &test_data).await; } #[tokio::test] async fn test_migration_preserves_data_integrity() { let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); // Create comprehensive test data covering all edge cases let user_id = create_test_user(pool).await; // Use unique test identifier to avoid conflicts with other tests let test_id = Uuid::new_v4().to_string()[..8].to_string(); // Pre-create the filenames to avoid borrowing issues let normal_success_filename = format!("data_integrity_test_{}_normal_success.pdf", test_id); let low_confidence_filename = format!("data_integrity_test_{}_low_confidence_fail.pdf", test_id); let timeout_filename = format!("data_integrity_test_{}_timeout_fail.pdf", test_id); let memory_filename = format!("data_integrity_test_{}_memory_fail.pdf", test_id); let corrupted_filename = format!("data_integrity_test_{}_corrupted_file.pdf", test_id); let unsupported_filename = format!("data_integrity_test_{}_unsupported.xyz", test_id); let pending_filename = format!("data_integrity_test_{}_pending_ocr.pdf", test_id); // Insert various types of documents let document_scenarios = vec![ DocumentScenario { filename: normal_success_filename, ocr_status: "completed".to_string(), ocr_failure_reason: None, ocr_error: None, ocr_confidence: Some(0.95), ocr_text: Some("This is a successful OCR".to_string()), file_size: 1024, }, DocumentScenario { filename: low_confidence_filename, ocr_status: "failed".to_string(), ocr_failure_reason: Some("low_ocr_confidence".to_string()), ocr_error: Some("OCR confidence below threshold".to_string()), ocr_confidence: Some(0.3), ocr_text: Some("Partially recognized text".to_string()), file_size: 2048, }, DocumentScenario { filename: timeout_filename, ocr_status: "failed".to_string(), ocr_failure_reason: Some("timeout".to_string()), ocr_error: Some("OCR processing timed out after 60 seconds".to_string()), ocr_confidence: None, ocr_text: None, file_size: 10485760, // 10MB }, DocumentScenario { filename: memory_filename, ocr_status: "failed".to_string(), ocr_failure_reason: Some("memory_limit".to_string()), ocr_error: Some("Memory limit exceeded".to_string()), ocr_confidence: None, ocr_text: None, file_size: 52428800, // 50MB }, DocumentScenario { filename: corrupted_filename, ocr_status: "failed".to_string(), ocr_failure_reason: Some("file_corrupted".to_string()), ocr_error: Some("PDF file appears to be corrupted".to_string()), ocr_confidence: None, ocr_text: None, file_size: 512, }, DocumentScenario { filename: unsupported_filename, ocr_status: "failed".to_string(), ocr_failure_reason: Some("unsupported_format".to_string()), ocr_error: Some("File format not supported".to_string()), ocr_confidence: None, ocr_text: None, file_size: 256, }, DocumentScenario { filename: pending_filename, ocr_status: "pending".to_string(), ocr_failure_reason: None, ocr_error: None, ocr_confidence: None, ocr_text: None, file_size: 4096, }, ]; // Insert all test documents let mut document_ids = HashMap::new(); for scenario in &document_scenarios { let doc_id = insert_test_document(pool, user_id, scenario).await; document_ids.insert(scenario.filename.clone(), doc_id); } // Count documents before migration (only our test documents) let failed_count_before: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed' AND filename LIKE $1" ) .bind(format!("data_integrity_test_{}_%%", test_id)) .fetch_one(pool) .await .unwrap(); let successful_count_before: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM documents WHERE ocr_status = 'completed' AND filename LIKE $1" ) .bind(format!("data_integrity_test_{}_%%", test_id)) .fetch_one(pool) .await .unwrap(); // Verify the migration query works correctly (simulate the migration) let migration_preview = sqlx::query( r#" SELECT d.filename, d.ocr_failure_reason, CASE WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout' WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied' ELSE 'other' END as mapped_failure_reason FROM documents d WHERE d.ocr_status = 'failed' "# ) .fetch_all(pool) .await .unwrap(); // Verify mappings are correct for row in migration_preview { let filename: String = row.get("filename"); let original_reason: Option = row.get("ocr_failure_reason"); let mapped_reason: String = row.get("mapped_failure_reason"); println!("Migration mapping: {} - {:?} -> {}", filename, original_reason, mapped_reason); // Verify specific mappings match original_reason.as_deref() { Some("low_ocr_confidence") => assert_eq!(mapped_reason, "low_ocr_confidence"), Some("timeout") => assert_eq!(mapped_reason, "ocr_timeout"), Some("memory_limit") => assert_eq!(mapped_reason, "ocr_memory_limit"), Some("file_corrupted") => assert_eq!(mapped_reason, "file_corrupted"), Some("unsupported_format") => assert_eq!(mapped_reason, "unsupported_format"), _ => assert_eq!(mapped_reason, "other"), } } // Verify that successful and pending documents are not affected assert_eq!(successful_count_before, 1, "Should have 1 successful document"); // It should have greater or equal to 5 failed documents assert_eq!(failed_count_before, 5, "Should have 5 failed documents"); } #[tokio::test] async fn test_migration_with_ocr_queue_data() { let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); let user_id = create_test_user(pool).await; // Create a document with OCR queue history let doc_id = Uuid::new_v4(); sqlx::query( r#" INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) VALUES ($1, $2, $3, $3, '/test/path', 1000, 'application/pdf', 'failed', 'timeout', 'OCR timeout after retries') "# ) .bind(doc_id) .bind(user_id) .bind("retry_test.pdf") .execute(pool) .await .unwrap(); // Add OCR queue entries to simulate retry history for i in 0..3 { sqlx::query( r#" INSERT INTO ocr_queue (document_id, priority, status, error_message, created_at) VALUES ($1, $2, $3, $4, NOW() - INTERVAL '1 hour' * $5) "# ) .bind(doc_id) .bind(1) .bind(if i < 2 { "failed" } else { "processing" }) .bind(if i < 2 { Some("Retry attempt failed") } else { None }) .bind((3 - i) as i32) .execute(pool) .await .unwrap(); } // Test the migration query with retry count let result = sqlx::query( r#" SELECT d.filename, d.ocr_failure_reason, COALESCE(q.retry_count, 0) as retry_count FROM documents d LEFT JOIN ( SELECT document_id, COUNT(*) as retry_count FROM ocr_queue WHERE status IN ('failed', 'completed') GROUP BY document_id ) q ON d.id = q.document_id WHERE d.id = $1 "# ) .bind(doc_id) .fetch_one(pool) .await .unwrap(); let retry_count: i64 = result.get("retry_count"); assert_eq!(retry_count, 2, "Should have 2 failed retry attempts"); } #[tokio::test] async fn test_migration_handles_null_values() { let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); let user_id = create_test_user(pool).await; // Insert documents with various NULL values let null_scenarios = vec![ ("null_reason.pdf", None, Some("Error without reason")), ("null_error.pdf", Some("unknown"), None), ("all_nulls.pdf", None, None), ]; for (filename, reason, error) in &null_scenarios { sqlx::query( r#" INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) VALUES ($1, $2, $2, '/test/path', 1000, 'application/pdf', 'failed', $3, $4) "# ) .bind(user_id) .bind(filename) .bind(reason) .bind(error) .execute(pool) .await .unwrap(); } // Verify migration handles NULLs correctly let migrated_data = sqlx::query( r#" SELECT filename, ocr_failure_reason, CASE WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout' WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied' ELSE 'other' END as mapped_reason, ocr_error FROM documents WHERE user_id = $1 AND ocr_status = 'failed' ORDER BY filename "# ) .bind(user_id) .fetch_all(pool) .await .unwrap(); assert_eq!(migrated_data.len(), 3); for row in migrated_data { let mapped_reason: String = row.get("mapped_reason"); assert_eq!(mapped_reason, "other", "NULL or unknown reasons should map to 'other'"); } } #[tokio::test] async fn test_migration_performance_with_large_dataset() { let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); let user_id = create_test_user(pool).await; // Insert a large number of failed documents with unique naming let test_id = Uuid::new_v4().to_string()[..8].to_string(); let batch_size = 100; let start_time = std::time::Instant::now(); for batch in 0..10 { let mut query = String::from( "INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) VALUES " ); let mut _values: Vec = Vec::new(); for i in 0..batch_size { let doc_num = batch * batch_size + i; let filename = format!("perf_migration_test_{}_bulk_doc_{}.pdf", test_id, doc_num); let reason = match doc_num % 5 { 0 => "low_ocr_confidence", 1 => "timeout", 2 => "memory_limit", 3 => "file_corrupted", _ => "unknown_error", }; if i > 0 { query.push_str(", "); } query.push_str(&format!("($1, '{}', '{}', '/test/path', 1000, 'application/pdf', 'failed', '{}', 'Test error')", filename, filename, reason)); } sqlx::query(&query) .bind(user_id) .execute(pool) .await .unwrap(); } let insert_duration = start_time.elapsed(); println!("Inserted 1000 documents in {:?}", insert_duration); // Measure migration query performance let migration_start = std::time::Instant::now(); let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed' AND filename LIKE $1" ) .bind(format!("perf_migration_test_{}_bulk_doc_%", test_id)) .fetch_one(pool) .await .unwrap(); assert_eq!(count, 1000, "Should have 1000 failed documents"); // Simulate the migration SELECT let _migration_data = sqlx::query( r#" SELECT * FROM documents WHERE ocr_status = 'failed' AND filename LIKE $1 "# ) .bind(format!("perf_migration_test_{}_bulk_doc_%", test_id)) .fetch_all(pool) .await .unwrap(); let migration_duration = migration_start.elapsed(); println!("Migration query completed in {:?}", migration_duration); // Performance assertion - migration should complete reasonably fast assert!(migration_duration.as_secs() < 5, "Migration query should complete within 5 seconds"); } // Helper functions struct TestData { user_id: Uuid, document_ids: HashMap, failure_scenarios: Vec<(String, String, String)>, } struct DocumentScenario { filename: String, ocr_status: String, ocr_failure_reason: Option, ocr_error: Option, ocr_confidence: Option, ocr_text: Option, file_size: i64, } async fn create_test_user(pool: &PgPool) -> Uuid { let user_id = Uuid::new_v4(); let unique_suffix = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos(); let username = format!("test_migration_user_{}", unique_suffix); let email = format!("test_migration_{}@example.com", unique_suffix); sqlx::query( "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" ) .bind(user_id) .bind(&username) .bind(&email) .bind("test_hash") .bind("user") .execute(pool) .await .unwrap(); user_id } async fn insert_test_document(pool: &PgPool, user_id: Uuid, scenario: &DocumentScenario) -> Uuid { let doc_id = Uuid::new_v4(); sqlx::query( r#" INSERT INTO documents ( id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error, ocr_confidence, ocr_text ) VALUES ( $1, $2, $3, $3, '/test/path', $4, $5, $6, $7, $8, $9, $10 ) "# ) .bind(doc_id) .bind(user_id) .bind(&scenario.filename) .bind(scenario.file_size) .bind(if scenario.filename.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" }) .bind(&scenario.ocr_status) .bind(scenario.ocr_failure_reason.as_ref()) .bind(scenario.ocr_error.as_ref()) .bind(scenario.ocr_confidence) .bind(scenario.ocr_text.as_ref()) .execute(pool) .await .unwrap(); doc_id } async fn prefill_test_data(pool: &PgPool) -> TestData { let user_id = create_test_user(pool).await; let mut document_ids = HashMap::new(); // Use unique test identifier to avoid conflicts with other tests let test_id = Uuid::new_v4().to_string()[..8].to_string(); let failure_scenarios = vec![ (format!("comp_migration_test_{}_timeout_doc.pdf", test_id), "timeout".to_string(), "OCR processing timed out".to_string()), (format!("comp_migration_test_{}_memory_doc.pdf", test_id), "memory_limit".to_string(), "Memory limit exceeded".to_string()), (format!("comp_migration_test_{}_corrupt_doc.pdf", test_id), "file_corrupted".to_string(), "File is corrupted".to_string()), (format!("comp_migration_test_{}_low_conf_doc.pdf", test_id), "low_ocr_confidence".to_string(), "Confidence too low".to_string()), ]; // Insert test documents for (filename, reason, error) in &failure_scenarios { let doc_id = Uuid::new_v4(); sqlx::query( r#" INSERT INTO documents ( id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error ) VALUES ( $1, $2, $3, $3, '/test/path', 1000, 'application/pdf', 'failed', $4, $5 ) "# ) .bind(doc_id) .bind(user_id) .bind(filename) .bind(reason) .bind(error) .execute(pool) .await .unwrap(); document_ids.insert(filename.clone(), doc_id); } TestData { user_id, document_ids, failure_scenarios, } } async fn verify_prefilled_data(pool: &PgPool, test_data: &TestData) { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM documents WHERE user_id = $1 AND ocr_status = 'failed' AND filename LIKE 'comp_migration_test_%'" ) .bind(test_data.user_id) .fetch_one(pool) .await .unwrap(); assert_eq!(count, test_data.failure_scenarios.len() as i64, "All test documents should be inserted"); } async fn test_failed_documents_migration(pool: &PgPool, test_data: &TestData) { // Simulate the migration let result = sqlx::query( r#" INSERT INTO failed_documents ( user_id, filename, original_filename, file_path, file_size, mime_type, error_message, failure_reason, failure_stage, ingestion_source ) SELECT d.user_id, d.filename, d.original_filename, d.file_path, d.file_size, d.mime_type, d.ocr_error, CASE WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout' WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied' ELSE 'other' END as failure_reason, 'ocr' as failure_stage, 'test_migration' as ingestion_source FROM documents d WHERE d.ocr_status = 'failed' AND d.user_id = $1 "# ) .bind(test_data.user_id) .execute(pool) .await; assert!(result.is_ok(), "Migration should succeed"); // Verify all documents were migrated let migrated_count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM failed_documents WHERE user_id = $1 AND ingestion_source = 'test_migration'" ) .bind(test_data.user_id) .fetch_one(pool) .await .unwrap(); assert_eq!(migrated_count, test_data.failure_scenarios.len() as i64, "All failed documents should be migrated"); } async fn verify_schema_integrity(pool: &PgPool) { // Check that all expected tables exist let tables = sqlx::query( "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" ) .fetch_all(pool) .await .unwrap(); let table_names: Vec = tables.iter() .map(|row| row.get("table_name")) .collect(); assert!(table_names.contains(&"documents".to_string())); assert!(table_names.contains(&"failed_documents".to_string())); assert!(table_names.contains(&"users".to_string())); assert!(table_names.contains(&"ocr_queue".to_string())); // Check that constraints exist on failed_documents let constraints = sqlx::query( r#" SELECT constraint_name, constraint_type FROM information_schema.table_constraints WHERE table_name = 'failed_documents' AND constraint_type = 'CHECK' "# ) .fetch_all(pool) .await .unwrap(); let constraint_names: Vec = constraints.iter() .map(|row| row.get("constraint_name")) .collect(); assert!(constraint_names.iter().any(|name| name.contains("failure_reason")), "Should have check constraint for failure_reason"); assert!(constraint_names.iter().any(|name| name.contains("failure_stage")), "Should have check constraint for failure_stage"); } async fn verify_data_consistency_after_migration(pool: &PgPool, test_data: &TestData) { // Create mappings based on the actual filenames in test_data let mut mappings = Vec::new(); for (filename, reason, _) in &test_data.failure_scenarios { let expected_reason = match reason.as_str() { "timeout" => "ocr_timeout", "memory_limit" => "ocr_memory_limit", "file_corrupted" => "file_corrupted", "low_ocr_confidence" => "low_ocr_confidence", _ => reason.as_str(), }; mappings.push((filename.as_str(), expected_reason)); } for (filename, expected_reason) in mappings { let result = sqlx::query( "SELECT failure_reason FROM failed_documents WHERE filename = $1 AND user_id = $2" ) .bind(filename) .bind(test_data.user_id) .fetch_optional(pool) .await .unwrap(); assert!(result.is_some(), "Document {} should exist in failed_documents", filename); let actual_reason: String = result.unwrap().get("failure_reason"); assert_eq!(actual_reason, expected_reason, "Failure reason for {} should be mapped correctly", filename); } // Verify all migrated documents have proper metadata let all_migrated = sqlx::query( "SELECT * FROM failed_documents WHERE user_id = $1" ) .bind(test_data.user_id) .fetch_all(pool) .await .unwrap(); for row in all_migrated { let failure_stage: String = row.get("failure_stage"); assert_eq!(failure_stage, "ocr", "All migrated documents should have 'ocr' as failure_stage"); let filename: String = row.get("filename"); assert!(test_data.document_ids.contains_key(&filename), "Migrated document should be from our test data"); } } }