/*! * Universal Source Sync Service Unit Tests * * Tests for the universal source sync service that handles: * - Multiple source types (WebDAV, Local Folder, S3) * - Generic sync operations and dispatching * - File deduplication and content hashing * - OCR queue integration * - Error handling across source types * - Performance optimization and metrics */ use std::sync::Arc; use std::collections::HashMap; use uuid::Uuid; use chrono::Utc; use serde_json::json; use sha2::{Sha256, Digest}; use readur::{ AppState, config::Config, db::Database, models::{Source, SourceType, SourceStatus, WebDAVSourceConfig, LocalFolderSourceConfig, S3SourceConfig}, scheduling::source_sync::SourceSyncService, }; /// Create a test WebDAV source fn create_test_webdav_source() -> Source { Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), name: "Test WebDAV".to_string(), source_type: SourceType::WebDAV, enabled: true, config: json!({ "server_url": "https://cloud.example.com", "username": "testuser", "password": "testpass", "watch_folders": ["/Documents"], "file_extensions": [".pdf", ".txt"], "auto_sync": true, "sync_interval_minutes": 60, "server_type": "nextcloud" }), status: SourceStatus::Idle, last_sync_at: None, last_error: None, last_error_at: None, total_files_synced: 0, total_files_pending: 0, total_size_bytes: 0, created_at: Utc::now(), updated_at: Utc::now(), validation_status: None, last_validation_at: None, validation_score: None, validation_issues: None, } } /// Create a test Local Folder source fn create_test_local_source() -> Source { Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), name: "Test Local Folder".to_string(), source_type: SourceType::LocalFolder, enabled: true, config: json!({ "watch_folders": ["/home/user/documents"], "recursive": true, "follow_symlinks": false, "auto_sync": true, "sync_interval_minutes": 30, "file_extensions": [".pdf", ".txt", ".jpg"] }), status: SourceStatus::Idle, last_sync_at: None, last_error: None, last_error_at: None, total_files_synced: 0, total_files_pending: 0, total_size_bytes: 0, created_at: Utc::now(), updated_at: Utc::now(), validation_status: None, last_validation_at: None, validation_score: None, validation_issues: None, } } /// Create a test S3 source fn create_test_s3_source() -> Source { Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), name: "Test S3".to_string(), source_type: SourceType::S3, enabled: true, config: json!({ "bucket_name": "test-documents", "region": "us-east-1", "access_key_id": "AKIATEST", "secret_access_key": "secrettest", "prefix": "documents/", "watch_folders": ["documents/"], "auto_sync": true, "sync_interval_minutes": 120, "file_extensions": [".pdf", ".docx"] }), status: SourceStatus::Idle, last_sync_at: None, last_error: None, last_error_at: None, total_files_synced: 0, total_files_pending: 0, total_size_bytes: 0, created_at: Utc::now(), updated_at: Utc::now(), validation_status: None, last_validation_at: None, validation_score: None, validation_issues: None, } } async fn create_test_app_state() -> Arc { let database_url = std::env::var("TEST_DATABASE_URL") .or_else(|_| std::env::var("DATABASE_URL")) .unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string()); let config = Config { database_url, server_address: "127.0.0.1:8080".to_string(), jwt_secret: "test_secret".to_string(), upload_path: "/tmp/test_uploads".to_string(), watch_folder: "/tmp/test_watch".to_string(), user_watch_base_dir: "./user_watch".to_string(), enable_per_user_watch: false, allowed_file_types: vec!["pdf".to_string(), "txt".to_string()], watch_interval_seconds: Some(30), file_stability_check_ms: Some(500), max_file_age_hours: None, ocr_language: "eng".to_string(), concurrent_ocr_jobs: 2, ocr_timeout_seconds: 60, max_file_size_mb: 10, memory_limit_mb: 256, cpu_priority: "normal".to_string(), oidc_enabled: false, oidc_client_id: None, oidc_client_secret: None, oidc_issuer_url: None, oidc_redirect_uri: None, oidc_auto_register: None, allow_local_auth: None, s3_enabled: false, s3_config: None, max_pdf_size_mb: 100, max_office_document_size_mb: 100, }; let db = Database::new(&config.database_url).await.unwrap(); // Create file service let storage_config = readur::storage::StorageConfig::Local { upload_path: config.upload_path.clone() }; let storage_backend = readur::storage::factory::create_storage_backend(storage_config).await.unwrap(); let file_service = std::sync::Arc::new(readur::services::file_service::FileService::with_storage(config.upload_path.clone(), storage_backend)); let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(db.clone(), db.pool.clone(), 2, file_service.clone(), 100, 100)); Arc::new(AppState { db, config, file_service, webdav_scheduler: None, source_scheduler: None, queue_service, oidc_client: None, sync_progress_tracker: Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()), user_watch_service: None, webdav_metrics_collector: None, }) } #[tokio::test] async fn test_source_sync_service_creation() { let state = create_test_app_state().await; let sync_service = SourceSyncService::new(state.clone()); // Test that sync service is created successfully // We can't access private fields, but we can test public interface assert!(true); // Service creation succeeded } #[test] fn test_source_type_detection() { let webdav_source = create_test_webdav_source(); let local_source = create_test_local_source(); let s3_source = create_test_s3_source(); assert_eq!(webdav_source.source_type, SourceType::WebDAV); assert_eq!(local_source.source_type, SourceType::LocalFolder); assert_eq!(s3_source.source_type, SourceType::S3); // Test string representations assert_eq!(webdav_source.source_type.to_string(), "webdav"); assert_eq!(local_source.source_type.to_string(), "local_folder"); assert_eq!(s3_source.source_type.to_string(), "s3"); } #[test] fn test_config_parsing_webdav() { let source = create_test_webdav_source(); let config: Result = serde_json::from_value(source.config.clone()); assert!(config.is_ok(), "WebDAV config should parse successfully"); let webdav_config = config.unwrap(); assert_eq!(webdav_config.server_url, "https://cloud.example.com"); assert_eq!(webdav_config.username, "testuser"); assert!(webdav_config.auto_sync); assert_eq!(webdav_config.sync_interval_minutes, 60); assert_eq!(webdav_config.file_extensions.len(), 2); } #[test] fn test_config_parsing_local_folder() { let source = create_test_local_source(); let config: Result = serde_json::from_value(source.config.clone()); assert!(config.is_ok(), "Local Folder config should parse successfully"); let local_config = config.unwrap(); assert_eq!(local_config.watch_folders.len(), 1); assert_eq!(local_config.watch_folders[0], "/home/user/documents"); assert!(local_config.recursive); assert!(!local_config.follow_symlinks); assert_eq!(local_config.sync_interval_minutes, 30); } #[test] fn test_config_parsing_s3() { let source = create_test_s3_source(); let config: Result = serde_json::from_value(source.config.clone()); assert!(config.is_ok(), "S3 config should parse successfully"); let s3_config = config.unwrap(); assert_eq!(s3_config.bucket_name, "test-documents"); assert_eq!(s3_config.region, "us-east-1"); assert_eq!(s3_config.prefix, Some("documents/".to_string())); assert_eq!(s3_config.sync_interval_minutes, 120); assert_eq!(s3_config.watch_folders.len(), 1); assert_eq!(s3_config.watch_folders[0], "documents/"); } #[test] fn test_file_deduplication_logic() { // Test SHA256-based file deduplication let file_content_1 = b"This is test file content for deduplication"; let file_content_2 = b"This is different file content"; let file_content_3 = b"This is test file content for deduplication"; // Same as 1 let hash_1 = calculate_content_hash(file_content_1); let hash_2 = calculate_content_hash(file_content_2); let hash_3 = calculate_content_hash(file_content_3); assert_ne!(hash_1, hash_2, "Different content should have different hashes"); assert_eq!(hash_1, hash_3, "Same content should have same hashes"); // Test hash format assert_eq!(hash_1.len(), 64); // SHA256 hex string length assert!(hash_1.chars().all(|c| c.is_ascii_hexdigit()), "Hash should be valid hex"); } fn calculate_content_hash(content: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(content); format!("{:x}", hasher.finalize()) } #[test] fn test_sync_metrics_structure() { let metrics = SyncMetrics { source_id: Uuid::new_v4(), source_type: SourceType::WebDAV, files_discovered: 100, files_downloaded: 85, files_skipped_existing: 10, files_skipped_extension: 3, files_failed: 2, total_bytes_downloaded: 50_000_000, // 50MB sync_duration_ms: 45_000, // 45 seconds ocr_jobs_queued: 75, errors: vec![ SyncError { file_path: "/Documents/failed.pdf".to_string(), error_message: "Network timeout".to_string(), error_code: "TIMEOUT".to_string(), } ], }; assert_eq!(metrics.source_type, SourceType::WebDAV); assert_eq!(metrics.files_discovered, 100); assert_eq!(metrics.files_downloaded, 85); // Test calculated metrics let total_processed = metrics.files_downloaded + metrics.files_skipped_existing + metrics.files_skipped_extension + metrics.files_failed; assert_eq!(total_processed, metrics.files_discovered); let success_rate = (metrics.files_downloaded as f64 / metrics.files_discovered as f64) * 100.0; assert_eq!(success_rate, 85.0); // Test throughput calculation let mb_per_second = (metrics.total_bytes_downloaded as f64 / 1_000_000.0) / (metrics.sync_duration_ms as f64 / 1000.0); assert!(mb_per_second > 0.0); } #[derive(Debug, Clone)] struct SyncMetrics { source_id: Uuid, source_type: SourceType, files_discovered: u32, files_downloaded: u32, files_skipped_existing: u32, files_skipped_extension: u32, files_failed: u32, total_bytes_downloaded: u64, sync_duration_ms: u64, ocr_jobs_queued: u32, errors: Vec, } #[derive(Debug, Clone)] struct SyncError { file_path: String, error_message: String, error_code: String, } #[test] fn test_ocr_queue_integration() { // Test OCR job creation for different file types let test_files = vec![ ("document.pdf", true), // Should queue for OCR ("image.jpg", true), // Should queue for OCR ("image.png", true), // Should queue for OCR ("text.txt", false), // Plain text, no OCR needed ("data.json", false), // JSON, no OCR needed ("archive.zip", false), // Archive, no OCR needed ]; for (filename, should_queue_ocr) in test_files { let needs_ocr = file_needs_ocr(filename); assert_eq!(needs_ocr, should_queue_ocr, "OCR queueing decision wrong for: {}", filename); } } fn file_needs_ocr(filename: &str) -> bool { let ocr_extensions = vec![".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp"]; let extension = extract_extension(filename); ocr_extensions.contains(&extension.as_str()) } fn extract_extension(filename: &str) -> String { if let Some(pos) = filename.rfind('.') { filename[pos..].to_lowercase() } else { String::new() } } #[test] fn test_sync_cancellation_handling() { // Test sync cancellation logic use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; let cancellation_token = Arc::new(AtomicBool::new(false)); // Test normal operation assert!(!cancellation_token.load(Ordering::Relaxed)); // Simulate cancellation request cancellation_token.store(true, Ordering::Relaxed); assert!(cancellation_token.load(Ordering::Relaxed)); // Test that sync would respect cancellation let should_continue = !cancellation_token.load(Ordering::Relaxed); assert!(!should_continue, "Sync should stop when cancelled"); // Test cancellation cleanup cancellation_token.store(false, Ordering::Relaxed); assert!(!cancellation_token.load(Ordering::Relaxed)); } #[test] fn test_error_classification() { let test_errors = vec![ ("Connection timeout", ErrorCategory::Network), ("DNS resolution failed", ErrorCategory::Network), ("HTTP 401 Unauthorized", ErrorCategory::Authentication), ("HTTP 403 Forbidden", ErrorCategory::Authentication), ("HTTP 404 Not Found", ErrorCategory::NotFound), ("HTTP 500 Internal Server Error", ErrorCategory::Server), ("Disk full", ErrorCategory::Storage), ("Permission denied", ErrorCategory::Permission), ("Invalid file format", ErrorCategory::Format), ("Unknown error", ErrorCategory::Unknown), ]; for (error_message, expected_category) in test_errors { let category = classify_error(error_message); assert_eq!(category, expected_category, "Error classification failed for: {}", error_message); } } #[derive(Debug, Clone, PartialEq)] enum ErrorCategory { Network, Authentication, NotFound, Server, Storage, Permission, Format, Unknown, } fn classify_error(error_message: &str) -> ErrorCategory { let msg = error_message.to_lowercase(); if msg.contains("timeout") || msg.contains("dns") || msg.contains("connection") { ErrorCategory::Network } else if msg.contains("401") || msg.contains("403") || msg.contains("unauthorized") || msg.contains("forbidden") { ErrorCategory::Authentication } else if msg.contains("404") || msg.contains("not found") { ErrorCategory::NotFound } else if msg.contains("500") || msg.contains("internal server") { ErrorCategory::Server } else if msg.contains("disk full") || msg.contains("storage") { ErrorCategory::Storage } else if msg.contains("permission denied") || msg.contains("access denied") { ErrorCategory::Permission } else if msg.contains("invalid file") || msg.contains("format") { ErrorCategory::Format } else { ErrorCategory::Unknown } } #[test] fn test_retry_strategy() { // Test retry strategy for different error types let retry_configs = vec![ (ErrorCategory::Network, 3, true), // Retry network errors (ErrorCategory::Server, 2, true), // Retry server errors (ErrorCategory::Authentication, 0, false), // Don't retry auth errors (ErrorCategory::NotFound, 0, false), // Don't retry not found (ErrorCategory::Permission, 0, false), // Don't retry permission errors (ErrorCategory::Format, 0, false), // Don't retry format errors ]; for (error_category, expected_retries, should_retry) in retry_configs { let retry_count = get_retry_count_for_error(&error_category); let will_retry = retry_count > 0; assert_eq!(retry_count, expected_retries); assert_eq!(will_retry, should_retry, "Retry decision wrong for: {:?}", error_category); } } fn get_retry_count_for_error(error_category: &ErrorCategory) -> u32 { match error_category { ErrorCategory::Network => 3, ErrorCategory::Server => 2, ErrorCategory::Storage => 1, _ => 0, // Don't retry other types } } #[test] fn test_sync_performance_monitoring() { // Test performance monitoring metrics let performance_data = SyncPerformanceData { throughput_mbps: 5.2, files_per_second: 2.8, avg_file_size_mb: 1.8, memory_usage_mb: 45.6, cpu_usage_percent: 12.3, network_latency_ms: 85, error_rate_percent: 2.1, }; // Test performance thresholds assert!(performance_data.throughput_mbps > 1.0, "Throughput should be reasonable"); assert!(performance_data.files_per_second > 0.5, "File processing rate should be reasonable"); assert!(performance_data.memory_usage_mb < 500.0, "Memory usage should be reasonable"); assert!(performance_data.cpu_usage_percent < 80.0, "CPU usage should be reasonable"); assert!(performance_data.network_latency_ms < 1000, "Network latency should be reasonable"); assert!(performance_data.error_rate_percent < 10.0, "Error rate should be low"); } #[derive(Debug, Clone)] struct SyncPerformanceData { throughput_mbps: f64, files_per_second: f64, avg_file_size_mb: f64, memory_usage_mb: f64, cpu_usage_percent: f64, network_latency_ms: u64, error_rate_percent: f64, } #[test] fn test_source_priority_handling() { // Test priority-based source processing let sources = vec![ (SourceType::LocalFolder, 1), // Highest priority (local is fastest) (SourceType::WebDAV, 2), // Medium priority (SourceType::S3, 3), // Lower priority (remote with potential costs) ]; let mut sorted_sources = sources.clone(); sorted_sources.sort_by_key(|(_, priority)| *priority); assert_eq!(sorted_sources[0].0, SourceType::LocalFolder); assert_eq!(sorted_sources[1].0, SourceType::WebDAV); assert_eq!(sorted_sources[2].0, SourceType::S3); // Test that local sources are processed first let local_priority = get_source_priority(&SourceType::LocalFolder); let webdav_priority = get_source_priority(&SourceType::WebDAV); let s3_priority = get_source_priority(&SourceType::S3); assert!(local_priority < webdav_priority); assert!(webdav_priority < s3_priority); } fn get_source_priority(source_type: &SourceType) -> u32 { match source_type { SourceType::LocalFolder => 1, // Highest priority SourceType::WebDAV => 2, // Medium priority SourceType::S3 => 3, // Lower priority } } #[test] fn test_concurrent_sync_protection() { use std::sync::{Arc, Mutex}; use std::collections::HashSet; // Test that only one sync per source can run at a time let active_syncs: Arc>> = Arc::new(Mutex::new(HashSet::new())); let source_id_1 = Uuid::new_v4(); let source_id_2 = Uuid::new_v4(); // Test adding first sync { let mut syncs = active_syncs.lock().unwrap(); assert!(syncs.insert(source_id_1)); } // Test adding second sync (different source) { let mut syncs = active_syncs.lock().unwrap(); assert!(syncs.insert(source_id_2)); } // Test preventing duplicate sync for same source { let mut syncs = active_syncs.lock().unwrap(); assert!(!syncs.insert(source_id_1)); // Should fail } // Test cleanup after sync completion { let mut syncs = active_syncs.lock().unwrap(); assert!(syncs.remove(&source_id_1)); assert!(!syncs.remove(&source_id_1)); // Should fail second time } } #[test] fn test_sync_state_transitions() { // Test valid state transitions during sync let valid_transitions = vec![ (SourceStatus::Idle, SourceStatus::Syncing), (SourceStatus::Syncing, SourceStatus::Idle), (SourceStatus::Syncing, SourceStatus::Error), (SourceStatus::Error, SourceStatus::Syncing), (SourceStatus::Error, SourceStatus::Idle), ]; for (from_state, to_state) in valid_transitions { assert!(is_valid_state_transition(&from_state, &to_state), "Invalid transition from {:?} to {:?}", from_state, to_state); } // Test invalid transitions let invalid_transitions = vec![ (SourceStatus::Idle, SourceStatus::Error), // Can't go directly to error without syncing ]; for (from_state, to_state) in invalid_transitions { assert!(!is_valid_state_transition(&from_state, &to_state), "Should not allow transition from {:?} to {:?}", from_state, to_state); } } fn is_valid_state_transition(from: &SourceStatus, to: &SourceStatus) -> bool { match (from, to) { (SourceStatus::Idle, SourceStatus::Syncing) => true, (SourceStatus::Syncing, SourceStatus::Idle) => true, (SourceStatus::Syncing, SourceStatus::Error) => true, (SourceStatus::Error, SourceStatus::Syncing) => true, (SourceStatus::Error, SourceStatus::Idle) => true, _ => false, } } #[test] fn test_bandwidth_limiting() { // Test bandwidth limiting calculations let bandwidth_limiter = BandwidthLimiter { max_mbps: 10.0, current_usage_mbps: 8.5, burst_allowance_mb: 50.0, current_burst_mb: 25.0, }; // Test if download should be throttled let should_throttle = bandwidth_limiter.should_throttle_download(5.0); // 5MB download assert!(!should_throttle, "Small download within burst allowance should not be throttled"); let should_throttle_large = bandwidth_limiter.should_throttle_download(30.0); // 30MB download assert!(should_throttle_large, "Large download exceeding burst should be throttled"); // Test delay calculation let delay_ms = bandwidth_limiter.calculate_delay_ms(1_000_000); // 1MB assert!(delay_ms > 0, "Should have some delay when near bandwidth limit"); } #[derive(Debug, Clone)] struct BandwidthLimiter { max_mbps: f64, current_usage_mbps: f64, burst_allowance_mb: f64, current_burst_mb: f64, } impl BandwidthLimiter { fn should_throttle_download(&self, download_size_mb: f64) -> bool { self.current_usage_mbps >= self.max_mbps * 0.8 && // Near limit download_size_mb > (self.burst_allowance_mb - self.current_burst_mb) } fn calculate_delay_ms(&self, bytes: u64) -> u64 { if self.current_usage_mbps < self.max_mbps * 0.8 { return 0; // No throttling needed } let mb = bytes as f64 / 1_000_000.0; let ideal_time_seconds = mb / self.max_mbps; (ideal_time_seconds * 1000.0) as u64 } }