Files
readur/tests/integration_webdav_concurrency_tests.rs

708 lines
28 KiB
Rust

use std::{sync::Arc, time::Duration, collections::HashMap};
use tokio::time::sleep;
use uuid::Uuid;
use futures::future::join_all;
use readur::{
AppState,
models::{CreateWebDAVDirectory, SourceType, SourceStatus, WebDAVSourceConfig, CreateSource},
test_utils::{TestContext, TestAuthHelper},
scheduling::source_scheduler::SourceScheduler,
services::webdav::{SmartSyncService, WebDAVService, WebDAVConfig, SyncProgress, SyncPhase},
};
/// Helper function to create test setup with database and real components
async fn create_integration_test_state() -> (TestContext, Arc<AppState>, Uuid) {
let test_context = TestContext::new().await;
let auth_helper = TestAuthHelper::new(test_context.app().clone());
let test_user = auth_helper.create_test_user().await;
let state = test_context.state().clone();
let user_id = test_user.user_response.id;
(test_context, state, user_id)
}
/// Helper to create a test WebDAV source
async fn create_test_webdav_source(
state: &Arc<AppState>,
user_id: Uuid,
name: &str,
auto_sync: bool,
) -> readur::models::Source {
let config = WebDAVSourceConfig {
server_url: "https://test.example.com".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/test".to_string()],
file_extensions: vec!["pdf".to_string(), "txt".to_string()],
auto_sync,
sync_interval_minutes: 1, // Fast interval for testing
server_type: Some("nextcloud".to_string()),
};
let create_source = CreateSource {
name: name.to_string(),
source_type: SourceType::WebDAV,
config: serde_json::to_value(config).unwrap(),
enabled: Some(true),
};
state.db.create_source(user_id, &create_source).await
.expect("Failed to create test source")
}
/// Mock WebDAV service that simulates network operations with controllable delays
#[derive(Clone)]
struct MockWebDAVService {
delay_ms: u64,
should_fail: bool,
etag_counter: Arc<std::sync::Mutex<u32>>,
}
impl MockWebDAVService {
fn new(delay_ms: u64, should_fail: bool) -> Self {
Self {
delay_ms,
should_fail,
etag_counter: Arc::new(std::sync::Mutex::new(1)),
}
}
async fn mock_discover_files_and_directories(
&self,
directory_path: &str,
_recursive: bool,
) -> Result<readur::services::webdav::WebDAVDiscoveryResult, anyhow::Error> {
// Simulate network delay
sleep(Duration::from_millis(self.delay_ms)).await;
if self.should_fail {
return Err(anyhow::anyhow!("Mock WebDAV discovery failed"));
}
// Generate unique ETags for each call to simulate directory changes
let etag = {
let mut counter = self.etag_counter.lock().unwrap();
*counter += 1;
format!("mock-etag-{}", *counter)
};
let mock_files = vec![
readur::models::FileIngestionInfo {
name: format!("test-file-{}.pdf", etag),
relative_path: format!("{}/test-file-{}.pdf", directory_path, etag),
full_path: format!("{}/test-file-{}.pdf", directory_path, etag),
#[allow(deprecated)]
path: format!("{}/test-file-{}.pdf", directory_path, etag),
size: 1024,
mime_type: "application/pdf".to_string(),
last_modified: Some(chrono::Utc::now()),
etag: etag.clone(),
is_directory: false,
created_at: Some(chrono::Utc::now()),
permissions: Some(0o644),
owner: None,
group: None,
metadata: None,
}
];
let mock_directories = vec![
readur::models::FileIngestionInfo {
name: "subdir".to_string(),
relative_path: format!("{}/subdir", directory_path),
full_path: format!("{}/subdir", directory_path),
#[allow(deprecated)]
path: format!("{}/subdir", directory_path),
size: 0,
mime_type: "".to_string(),
last_modified: Some(chrono::Utc::now()),
etag: etag.clone(),
is_directory: true,
created_at: Some(chrono::Utc::now()),
permissions: Some(0o755),
owner: None,
group: None,
metadata: None,
}
];
Ok(readur::services::webdav::WebDAVDiscoveryResult {
files: mock_files,
directories: mock_directories,
})
}
}
/// Test concurrent source scheduler trigger operations
#[tokio::test]
async fn test_concurrent_source_scheduler_triggers() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Create test sources
let source1 = create_test_webdav_source(&state, user_id, "TestSource1", false).await;
let source2 = create_test_webdav_source(&state, user_id, "TestSource2", false).await;
// Create source scheduler
let scheduler = Arc::new(SourceScheduler::new(state.clone()));
// Test concurrent triggers of the same source
let concurrent_triggers = (0..5).map(|i| {
let scheduler_clone = scheduler.clone();
let source_id = source1.id;
tokio::spawn(async move {
println!("Trigger attempt {} for source {}", i, source_id);
// Try to trigger sync - some should succeed, others should get conflict
let result = scheduler_clone.trigger_sync(source_id).await;
println!("Trigger {} result: {:?}", i, result.is_ok());
(i, result)
})
});
// Wait for all trigger attempts
let trigger_results: Vec<_> = join_all(concurrent_triggers).await;
// Verify all tasks completed without panicking
for result in trigger_results {
let (task_id, sync_result) = result.expect("Task should complete without panicking");
println!("Task {} completed with result: {:?}", task_id, sync_result.is_ok());
// Note: The actual sync operations might fail due to concurrency control,
// but the scheduler should handle this gracefully
}
// Test concurrent triggers across different sources
let cross_source_triggers = vec![
(scheduler.clone(), source1.id),
(scheduler.clone(), source2.id),
(scheduler.clone(), source1.id), // Duplicate to test conflict handling
(scheduler.clone(), source2.id), // Duplicate to test conflict handling
]
.into_iter()
.enumerate()
.map(|(i, (scheduler_clone, source_id))| {
tokio::spawn(async move {
println!("Cross-source trigger {} for source {}", i, source_id);
let result = scheduler_clone.trigger_sync(source_id).await;
(i, source_id, result)
})
});
let cross_results: Vec<_> = join_all(cross_source_triggers).await;
// Verify cross-source operations
for result in cross_results {
let (task_id, source_id, sync_result) = result.expect("Cross-source task should complete");
println!("Cross-source task {} for source {} completed: {:?}", task_id, source_id, sync_result.is_ok());
}
// Give time for any background tasks to complete
sleep(Duration::from_millis(2000)).await; // Extended timeout
// Verify final source states are consistent
let mut final_source1 = state.db.get_source(user_id, source1.id).await
.expect("Failed to get source1")
.expect("Source1 should exist");
let mut final_source2 = state.db.get_source(user_id, source2.id).await
.expect("Failed to get source2")
.expect("Source2 should exist");
// If sources are still syncing, try force reset as failsafe
let scheduler_reset = SourceScheduler::new(state.clone());
if matches!(final_source1.status, SourceStatus::Syncing) {
println!("Source1 still syncing after 2s, attempting force reset...");
if let Err(e) = scheduler_reset.force_reset_source(source1.id).await {
println!("Force reset source1 failed: {}", e);
} else {
sleep(Duration::from_millis(100)).await;
final_source1 = state.db.get_source(user_id, source1.id).await
.expect("Failed to get source1")
.expect("Source1 should exist");
println!("Source1 status after force reset: {:?}", final_source1.status);
}
}
if matches!(final_source2.status, SourceStatus::Syncing) {
println!("Source2 still syncing after 2s, attempting force reset...");
if let Err(e) = scheduler_reset.force_reset_source(source2.id).await {
println!("Force reset source2 failed: {}", e);
} else {
sleep(Duration::from_millis(100)).await;
final_source2 = state.db.get_source(user_id, source2.id).await
.expect("Failed to get source2")
.expect("Source2 should exist");
println!("Source2 status after force reset: {:?}", final_source2.status);
}
}
// Sources should not be stuck in syncing state
assert_ne!(final_source1.status, SourceStatus::Syncing,
"Source1 should not be stuck in syncing state");
assert_ne!(final_source2.status, SourceStatus::Syncing,
"Source2 should not be stuck in syncing state");
}
/// Test concurrent smart sync evaluations
#[tokio::test]
async fn test_concurrent_smart_sync_evaluations() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Pre-populate some directory state
let test_directories = vec![
CreateWebDAVDirectory {
user_id,
directory_path: "/test".to_string(),
directory_etag: "initial-etag".to_string(),
file_count: 5,
total_size_bytes: 1024,
},
CreateWebDAVDirectory {
user_id,
directory_path: "/test/subdir1".to_string(),
directory_etag: "subdir1-etag".to_string(),
file_count: 3,
total_size_bytes: 512,
},
CreateWebDAVDirectory {
user_id,
directory_path: "/test/subdir2".to_string(),
directory_etag: "subdir2-etag".to_string(),
file_count: 2,
total_size_bytes: 256,
},
];
for dir in test_directories {
state.db.create_or_update_webdav_directory(&dir).await
.expect("Failed to create test directory");
}
// Create mock WebDAV services with different behaviors
let mock_services = vec![
MockWebDAVService::new(50, false), // Fast, successful
MockWebDAVService::new(100, false), // Slower, successful
MockWebDAVService::new(200, false), // Slowest, successful
MockWebDAVService::new(75, true), // Medium speed, fails
MockWebDAVService::new(25, false), // Fastest, successful
];
// Test concurrent smart sync evaluations
let concurrent_evaluations = mock_services.into_iter().enumerate().map(|(i, mock_service)| {
let state_clone = state.clone();
let user_id = user_id;
tokio::spawn(async move {
println!("Smart sync evaluation {} starting", i);
// Create SmartSyncService for this task
let smart_sync_service = SmartSyncService::new(state_clone.clone());
// Mock the WebDAV service call by calling the database methods directly
// This simulates what would happen during concurrent smart sync evaluations
// 1. Read current directory state (simulates smart sync evaluation)
let known_dirs_result = state_clone.db.list_webdav_directories(user_id).await;
// 2. Simulate discovery results with some delay
sleep(Duration::from_millis(mock_service.delay_ms)).await;
if mock_service.should_fail {
return (i, Err(anyhow::anyhow!("Mock evaluation failed")));
}
// 3. Update directory ETags (simulates directory changes being detected)
let update_result = state_clone.db.create_or_update_webdav_directory(
&CreateWebDAVDirectory {
user_id,
directory_path: "/test".to_string(),
directory_etag: format!("updated-etag-{}", i),
file_count: 5 + i as i64,
total_size_bytes: 1024 + (i as i64 * 100),
}
).await;
println!("Smart sync evaluation {} completed", i);
(i, Ok((known_dirs_result, update_result)))
})
});
// Wait for all evaluations
let evaluation_results: Vec<_> = join_all(concurrent_evaluations).await;
// Verify all evaluations completed
let mut successful_evaluations = 0;
let mut failed_evaluations = 0;
for result in evaluation_results {
let (task_id, eval_result) = result.expect("Evaluation task should complete without panicking");
match eval_result {
Ok((read_result, update_result)) => {
assert!(read_result.is_ok(), "Directory read should succeed for task {}", task_id);
assert!(update_result.is_ok(), "Directory update should succeed for task {}", task_id);
successful_evaluations += 1;
}
Err(_) => {
failed_evaluations += 1;
}
}
}
println!("Smart sync evaluations: {} successful, {} failed", successful_evaluations, failed_evaluations);
// Verify the system is in a consistent state
let final_directories = state.db.list_webdav_directories(user_id).await
.expect("Failed to list final directories");
assert_eq!(final_directories.len(), 3, "Should still have 3 directories");
// The main directory should have been updated by one of the successful operations
let main_dir = final_directories.iter()
.find(|d| d.directory_path == "/test")
.expect("Main directory should exist");
assert!(main_dir.directory_etag.starts_with("updated-etag-"),
"Main directory should have updated ETag: {}", main_dir.directory_etag);
}
/// Test concurrent sync triggers with stop operations
#[tokio::test]
async fn test_concurrent_sync_triggers_with_stops() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Create test source
let source = create_test_webdav_source(&state, user_id, "StoppableSource", false).await;
// Create source scheduler
let scheduler = Arc::new(SourceScheduler::new(state.clone()));
// Start multiple sync operations and stop attempts concurrently
let operations = (0..10).map(|i| {
let scheduler_clone = scheduler.clone();
let source_id = source.id;
tokio::spawn(async move {
if i % 3 == 0 {
// Trigger sync
println!("Triggering sync for operation {}", i);
let result = scheduler_clone.trigger_sync(source_id).await;
(i, "trigger", result.is_ok())
} else if i % 3 == 1 {
// Stop sync (might not have anything to stop)
println!("Stopping sync for operation {}", i);
let result = scheduler_clone.stop_sync(source_id).await;
(i, "stop", result.is_ok())
} else {
// Read source status
println!("Reading status for operation {}", i);
sleep(Duration::from_millis(50)).await; // Small delay to simulate status checks
(i, "status", true) // Status reads should always work
}
})
});
// Wait for all operations
let operation_results: Vec<_> = join_all(operations).await;
// Verify all operations completed without panicking
for result in operation_results {
let (task_id, op_type, success) = result.expect("Operation task should complete");
println!("Operation {}: {} -> {}", task_id, op_type, success);
}
// Give time for any background operations to settle
sleep(Duration::from_millis(2000)).await; // Extended timeout
// Verify source is in a stable state
let mut final_source = state.db.get_source(user_id, source.id).await
.expect("Failed to get source")
.expect("Source should exist");
// If source is still syncing, try force reset as failsafe
if matches!(final_source.status, SourceStatus::Syncing) {
println!("Source still syncing after 2s, attempting force reset...");
let scheduler = SourceScheduler::new(state.clone());
if let Err(e) = scheduler.force_reset_source(source.id).await {
println!("Force reset failed: {}", e);
} else {
sleep(Duration::from_millis(100)).await;
final_source = state.db.get_source(user_id, source.id).await
.expect("Failed to get source")
.expect("Source should exist");
println!("Source status after force reset: {:?}", final_source.status);
}
}
// Source should not be stuck in an inconsistent state
assert!(matches!(final_source.status, SourceStatus::Idle | SourceStatus::Error),
"Source should be in a stable state, got: {:?}", final_source.status);
}
/// Test concurrent source status updates during sync operations
#[tokio::test]
async fn test_concurrent_source_status_updates() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Create test source
let source = create_test_webdav_source(&state, user_id, "StatusTestSource", false).await;
// Test concurrent status updates from different "components"
let status_updates = (0..20).map(|i| {
let state_clone = state.clone();
let source_id = source.id;
tokio::spawn(async move {
let (status, message) = match i % 4 {
0 => (SourceStatus::Syncing, Some("Starting sync")),
1 => (SourceStatus::Idle, None),
2 => (SourceStatus::Error, Some("Test error")),
3 => (SourceStatus::Syncing, Some("Resuming sync")),
_ => unreachable!(),
};
// Add small random delay to increase chance of race conditions
sleep(Duration::from_millis(((i % 10) * 10) as u64)).await;
let result = if let Some(msg) = message {
sqlx::query(
r#"UPDATE sources
SET status = $2, last_error = $3, last_error_at = NOW(), updated_at = NOW()
WHERE id = $1"#
)
.bind(source_id)
.bind(status.to_string())
.bind(msg)
.execute(state_clone.db.get_pool())
.await
} else {
sqlx::query(
r#"UPDATE sources
SET status = $2, last_error = NULL, last_error_at = NULL, updated_at = NOW()
WHERE id = $1"#
)
.bind(source_id)
.bind(status.to_string())
.execute(state_clone.db.get_pool())
.await
};
(i, status, result.is_ok())
})
});
// Wait for all status updates
let update_results: Vec<_> = join_all(status_updates).await;
// Verify all updates completed successfully
for result in update_results {
let (task_id, expected_status, success) = result.expect("Status update task should complete");
assert!(success, "Status update {} to {:?} should succeed", task_id, expected_status);
}
// Verify final source state is consistent
let final_source = state.db.get_source(user_id, source.id).await
.expect("Failed to get source")
.expect("Source should exist");
// Source should have a valid status (one of the updates should have succeeded)
assert!(matches!(final_source.status,
SourceStatus::Idle | SourceStatus::Syncing | SourceStatus::Error),
"Source should have a valid status: {:?}", final_source.status);
// Verify database consistency - updated_at is a DateTime, not Option
println!("Source updated at: {:?}", final_source.updated_at);
}
/// Test concurrent directory ETag updates during smart sync operations
#[tokio::test]
async fn test_concurrent_directory_etag_updates_during_smart_sync() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Create initial directory structure
let base_directories = vec![
("/test", "base-etag-1"),
("/test/docs", "base-etag-2"),
("/test/images", "base-etag-3"),
("/test/archive", "base-etag-4"),
];
for (path, etag) in &base_directories {
let directory = CreateWebDAVDirectory {
user_id,
directory_path: path.to_string(),
directory_etag: etag.to_string(),
file_count: 10,
total_size_bytes: 1024,
};
state.db.create_or_update_webdav_directory(&directory).await
.expect("Failed to create base directory");
}
// Simulate concurrent smart sync operations updating directory ETags
let smart_sync_updates = (0..15).map(|i| {
let state_clone = state.clone();
let user_id = user_id;
let base_dirs = base_directories.clone(); // Clone for use in async task
tokio::spawn(async move {
// Pick a directory to update
let dir_index = i % base_dirs.len();
let (path, _) = &base_dirs[dir_index];
// Simulate smart sync discovering changes
sleep(Duration::from_millis(((i % 5) * 20) as u64)).await;
// Create "discovered" directory info with new ETag
let updated_directory = CreateWebDAVDirectory {
user_id,
directory_path: path.to_string(),
directory_etag: format!("smart-sync-etag-{}-{}", dir_index, i),
file_count: 10 + i as i64,
total_size_bytes: 1024 + (i as i64 * 100),
};
// Update directory (simulates smart sync saving discovered ETags)
let result = state_clone.db.create_or_update_webdav_directory(&updated_directory).await;
(i, dir_index, path.to_string(), result.is_ok())
})
});
// Wait for all smart sync updates
let update_results: Vec<_> = join_all(smart_sync_updates).await;
// Verify all updates completed
for result in update_results {
let (task_id, dir_index, path, success) = result.expect("Smart sync update task should complete");
assert!(success, "Smart sync update {} for directory {} ({}) should succeed",
task_id, dir_index, path);
}
// Verify final directory state
let final_directories = state.db.list_webdav_directories(user_id).await
.expect("Failed to list final directories");
assert_eq!(final_directories.len(), base_directories.len(),
"Should have same number of directories");
// Verify all directories have been updated with smart sync ETags
let mut updated_count = 0;
for directory in final_directories {
if directory.directory_etag.contains("smart-sync-etag-") {
updated_count += 1;
}
// File count should have been updated by at least one operation
assert!(directory.file_count >= 10,
"Directory {} should have updated file count: {}",
directory.directory_path, directory.file_count);
}
assert!(updated_count > 0,
"At least some directories should have smart sync ETags, got {} updated",
updated_count);
}
/// Test resilience to partial failures during concurrent operations
#[tokio::test]
async fn test_concurrent_operations_with_partial_failures() {
let (_test_context, state, user_id) = create_integration_test_state().await;
// Create multiple sources for testing
let sources = vec![
create_test_webdav_source(&state, user_id, "ReliableSource", false).await,
create_test_webdav_source(&state, user_id, "UnreliableSource", false).await,
create_test_webdav_source(&state, user_id, "SlowSource", false).await,
];
// Create source scheduler
let scheduler = Arc::new(SourceScheduler::new(state.clone()));
// Mix of operations that might succeed or fail
let mixed_operations = (0..20).map(|i| {
let scheduler_clone = scheduler.clone();
let state_clone = state.clone();
let source_id = sources[i % sources.len()].id;
let user_id = user_id;
tokio::spawn(async move {
match i % 5 {
0 => {
// Normal sync trigger
let result = scheduler_clone.trigger_sync(source_id).await;
(i, "trigger", result.is_ok(), None)
}
1 => {
// Stop operation (might have nothing to stop)
let result = scheduler_clone.stop_sync(source_id).await;
(i, "stop", true, None) // Always consider stop attempts as successful
}
2 => {
// Database read operation
let result = state_clone.db.get_source(user_id, source_id).await;
(i, "read", result.is_ok(), None)
}
3 => {
// Status update operation
let result = sqlx::query(
"UPDATE sources SET status = 'idle', updated_at = NOW() WHERE id = $1"
)
.bind(source_id)
.execute(state_clone.db.get_pool())
.await;
(i, "status_update", result.is_ok(), None)
}
4 => {
// Directory listing operation (simulates smart sync evaluation)
let result = state_clone.db.list_webdav_directories(user_id).await;
(i, "list_dirs", result.is_ok(), Some(result.map(|dirs| dirs.len()).unwrap_or(0)))
}
_ => unreachable!(),
}
})
});
// Wait for all operations
let operation_results: Vec<_> = join_all(mixed_operations).await;
// Analyze results
let mut operation_stats = HashMap::new();
let mut total_operations = 0;
let mut successful_operations = 0;
for result in operation_results {
let task_result = result.expect("Operation task should complete without panicking");
let (task_id, op_type, success, extra_info) = task_result;
*operation_stats.entry(op_type).or_insert(0) += 1;
total_operations += 1;
if success {
successful_operations += 1;
}
println!("Operation {}: {} -> {} {:?}", task_id, op_type, success, extra_info);
}
println!("Operation statistics: {:?}", operation_stats);
println!("Success rate: {}/{} ({:.1}%)",
successful_operations, total_operations,
(successful_operations as f64 / total_operations as f64) * 100.0);
// Verify system resilience
assert!(successful_operations > 0, "At least some operations should succeed");
// Save the first source ID for later use
let first_source_id = sources[0].id;
// Verify all sources are in consistent states
for source in sources {
let final_source = state.db.get_source(user_id, source.id).await
.expect("Failed to get source")
.expect("Source should exist");
// Source should be in a valid state (not corrupted by partial failures)
assert!(matches!(final_source.status,
SourceStatus::Idle | SourceStatus::Syncing | SourceStatus::Error),
"Source {} should be in valid state: {:?}", source.name, final_source.status);
}
// System should remain functional for new operations
let recovery_test = scheduler.trigger_sync(first_source_id).await;
// Recovery might succeed or fail, but shouldn't panic
println!("Recovery test result: {:?}", recovery_test.is_ok());
}