Files
readur/tests/integration_source_sync_cancellation_workflow_tests.rs

1539 lines
57 KiB
Rust

/*!
* Source Sync Cancellation Workflow Integration Tests
*
* Comprehensive end-to-end integration tests for source sync cancellation functionality:
* - Full sync cancellation workflow via API endpoints
* - Cancellation during different sync phases
* - Multiple cancellation request handling
* - Status monitoring and transitions
* - Resource cleanup verification
* - Database state consistency
*/
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
use chrono::Utc;
use serde_json::json;
use tokio::time::sleep;
use axum::{
body::Body,
extract::Path,
http::{Request, StatusCode},
Router,
};
use tower::ServiceExt;
use readur::{
AppState,
config::Config,
db::Database,
models::{Source, SourceType, SourceStatus, User, CreateSource, CreateUser, UserRole},
auth::Claims,
test_helpers::create_test_config_with_db,
};
/// Create a test app state with database and real source scheduler
async fn create_test_app_state() -> Arc<AppState> {
let database_url = std::env::var("TEST_DATABASE_URL")
.or_else(|_| std::env::var("DATABASE_URL"))
.unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string());
let mut config = create_test_config_with_db(&database_url);
config.server_address = "127.0.0.1:8080".to_string();
config.jwt_secret = "test_secret_for_sync_cancellation".to_string();
config.upload_path = "/tmp/test_uploads_sync_cancel".to_string();
config.watch_folder = "/tmp/watch_sync_cancel".to_string();
config.allowed_file_types = vec!["pdf".to_string(), "txt".to_string(), "jpg".to_string(), "png".to_string()];
let db = Database::new(&config.database_url).await.unwrap();
// Create file service
let storage_config = readur::storage::StorageConfig::Local { upload_path: config.upload_path.clone() };
let storage_backend = readur::storage::factory::create_storage_backend(storage_config).await.unwrap();
let file_service = Arc::new(readur::services::file_service::FileService::with_storage(config.upload_path.clone(), storage_backend));
let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(
db.clone(),
db.pool.clone(),
2,
file_service.clone(),
100,
100,
));
let sync_progress_tracker = Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new());
// Create initial app state
let app_state = AppState {
db: db.clone(),
config,
file_service: file_service.clone(),
webdav_scheduler: None,
source_scheduler: None,
queue_service,
oidc_client: None,
sync_progress_tracker,
user_watch_service: None,
webdav_metrics_collector: None,
};
// Wrap in Arc for sharing
let state_arc = Arc::new(app_state);
// Create the real source scheduler
let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(state_arc.clone()));
// Now we need to update the AppState with the scheduler
// Since AppState is already wrapped in Arc, we need to use a different approach
// Let's create a new AppState with the scheduler
Arc::new(AppState {
db: state_arc.db.clone(),
config: state_arc.config.clone(),
file_service: state_arc.file_service.clone(),
webdav_scheduler: None,
source_scheduler: Some(source_scheduler),
queue_service: state_arc.queue_service.clone(),
oidc_client: None,
sync_progress_tracker: state_arc.sync_progress_tracker.clone(),
user_watch_service: None,
webdav_metrics_collector: None,
})
}
/// Create a test user for sync cancellation tests
async fn create_test_user(state: &AppState) -> User {
let user_id = Uuid::new_v4();
let create_user = CreateUser {
username: format!("testuser_sync_cancel_{}", user_id),
email: format!("testuser_sync_cancel_{}@example.com", user_id),
password: "test_password".to_string(),
role: Some(UserRole::Admin),
};
state.db.create_user(create_user).await.unwrap()
}
/// Create a test WebDAV source for cancellation testing
/// Uses a non-existent server so sync will fail, but we can test the cancellation workflow
async fn create_test_webdav_source(state: &AppState, user_id: Uuid, name: &str) -> Source {
let create_source = CreateSource {
name: name.to_string(),
source_type: SourceType::WebDAV,
enabled: Some(true),
config: json!({
"server_url": "https://test-webdav-server-for-cancellation-testing.example.com/remote.php/webdav",
"username": "test_user",
"password": "test_password",
"watch_folders": ["/TestFolder"],
"file_extensions": [".pdf", ".txt", ".jpg", ".png"],
"auto_sync": false,
"sync_interval_minutes": 60,
"server_type": "nextcloud"
}),
};
state.db.create_source(user_id, &create_source).await.unwrap()
}
/// Wait for a source to reach a specific status with timeout
async fn wait_for_source_status(
state: &AppState,
user_id: Uuid,
source_id: Uuid,
expected_status: SourceStatus,
timeout_ms: u64
) -> bool {
let start_time = std::time::Instant::now();
let timeout_duration = Duration::from_millis(timeout_ms);
while start_time.elapsed() < timeout_duration {
if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await {
if source.status == expected_status {
return true;
}
}
sleep(Duration::from_millis(100)).await;
}
false
}
/// Wait for sync to actually start (status becomes Syncing)
async fn wait_for_sync_to_start(
state: &AppState,
user_id: Uuid,
source_id: Uuid,
timeout_ms: u64
) -> bool {
wait_for_source_status(state, user_id, source_id, SourceStatus::Syncing, timeout_ms).await
}
/// Wait for sync to stop (status becomes Idle or Error)
async fn wait_for_sync_to_stop(
state: &AppState,
user_id: Uuid,
source_id: Uuid,
timeout_ms: u64
) -> bool {
let start_time = std::time::Instant::now();
let timeout_duration = Duration::from_millis(timeout_ms);
while start_time.elapsed() < timeout_duration {
if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await {
if matches!(source.status, SourceStatus::Idle | SourceStatus::Error) {
return true;
}
}
sleep(Duration::from_millis(100)).await;
}
false
}
/// Create HTTP client for API testing
fn create_test_app(state: Arc<AppState>) -> Router {
use axum::{routing::get, Router};
Router::new()
.route("/api/health", get(readur::health_check))
.nest("/api/auth", readur::routes::auth::router())
.nest("/api/documents", readur::routes::documents::router())
.nest("/api/ignored-files", readur::routes::ignored_files::ignored_files_routes())
.nest("/api/labels", readur::routes::labels::router())
.nest("/api/metrics", readur::routes::metrics::router())
.nest("/metrics", readur::routes::prometheus_metrics::router())
.nest("/api/notifications", readur::routes::notifications::router())
.nest("/api/ocr", readur::routes::ocr::router())
.nest("/api/queue", readur::routes::queue::router())
.nest("/api/search", readur::routes::search::router())
.nest("/api/settings", readur::routes::settings::router())
.nest("/api/sources", readur::routes::sources::router())
.nest("/api/users", readur::routes::users::router())
.nest("/api/webdav", readur::routes::webdav::router())
.with_state(state)
}
/// Create authorization header for test user
fn create_auth_header(user: &User, jwt_secret: &str) -> String {
let claims = Claims {
sub: user.id,
username: user.username.clone(),
exp: (chrono::Utc::now() + chrono::Duration::hours(24)).timestamp() as usize,
};
let token = jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_ref()),
).unwrap();
format!("Bearer {}", token)
}
#[tokio::test]
async fn test_complete_sync_cancellation_workflow() {
println!("🧪 Testing complete sync cancellation workflow");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Test Cancellation Source").await;
let app = create_test_app(state.clone());
// Create auth header
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test user and source: {}", source.id);
// Step 1: Verify source is initially idle
let response = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri(format!("/api/sources/{}/sync/status", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Source is initially idle");
// Step 2: Start sync using the real scheduler
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let actual_status = response.status();
println!("🔍 Sync start actual status: {}", actual_status);
// With real scheduler, should return OK (unless already running)
assert!(matches!(actual_status, StatusCode::OK | StatusCode::CONFLICT));
println!("✅ Sync start request completed with status: {}", actual_status);
// Step 3: Wait for sync to actually start (with real scheduler)
let sync_started = wait_for_sync_to_start(&state, user.id, source.id, 5000).await;
if sync_started {
println!("✅ Sync actually started - status changed to Syncing");
// Give it a moment to establish the sync
sleep(Duration::from_millis(500)).await;
} else {
println!("⚠️ Sync did not start within timeout (may fail quickly due to invalid server)");
// The sync might fail immediately due to invalid server, which is fine for testing cancellation
}
// Step 4: Cancel the sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync cancellation request successful");
// Step 5: Wait for sync to actually stop with real scheduler
let sync_stopped = wait_for_sync_to_stop(&state, user.id, source.id, 10000).await;
if sync_stopped {
println!("✅ Sync actually stopped - status changed to Idle/Error");
} else {
println!("⚠️ Sync did not stop within timeout, checking current status");
}
let source_after_cancel = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("✅ Source status after cancellation: {:?}", source_after_cancel.status);
// With real scheduler, we should see proper status transitions
assert!(matches!(source_after_cancel.status, SourceStatus::Idle | SourceStatus::Error),
"Source should be Idle or Error after cancellation, got: {:?}", source_after_cancel.status);
// Step 6: Verify sync status API shows no active sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri(format!("/api/sources/{}/sync/status", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync status API accessible after cancellation");
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Complete sync cancellation workflow test passed");
}
#[tokio::test]
async fn test_multiple_cancellation_requests() {
println!("🧪 Testing multiple cancellation requests handling");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Multiple Cancel Test Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for multiple cancellation test");
// Start sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Sync started with status: {}", response.status());
// Wait briefly
sleep(Duration::from_millis(200)).await;
// Send multiple cancellation requests concurrently
let mut cancel_handles = Vec::new();
for i in 0..3 {
let app_clone = app.clone();
let auth_header_clone = auth_header.clone();
let source_id = source.id;
let handle = tokio::spawn(async move {
let response = app_clone
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source_id))
.header("Authorization", auth_header_clone)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Cancellation request {} completed with status: {}", i + 1, response.status());
response.status()
});
cancel_handles.push(handle);
}
// Wait for all cancellation requests to complete
let mut success_count = 0;
for handle in cancel_handles {
let status = handle.await.unwrap();
if status == StatusCode::OK {
success_count += 1;
}
}
// All cancellation requests should succeed (idempotent)
assert_eq!(success_count, 3);
println!("✅ All {} cancellation requests succeeded", success_count);
// Verify final state
sleep(Duration::from_millis(1000)).await;
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("✅ Final source status: {:?}", final_source.status);
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Multiple cancellation requests test passed");
}
#[tokio::test]
async fn test_cancellation_without_active_sync() {
println!("🧪 Testing cancellation when no sync is active");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "No Active Sync Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for no active sync test");
// Verify source is idle
let initial_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
assert_eq!(initial_source.status, SourceStatus::Idle);
println!("✅ Source is initially idle: {:?}", initial_source.status);
// Try to cancel sync when none is active
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// Should succeed (idempotent behavior)
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Cancellation without active sync succeeded");
// Verify source remains idle
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
assert_eq!(final_source.status, SourceStatus::Idle);
println!("✅ Source remains idle after cancellation: {:?}", final_source.status);
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Cancellation without active sync test passed");
}
#[tokio::test]
async fn test_sync_status_monitoring_during_cancellation() {
println!("🧪 Testing sync status monitoring during cancellation");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Status Monitor Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for status monitoring test");
// Start sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Sync started with status: {}", response.status());
// Monitor sync status before cancellation
let response = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri(format!("/api/sources/{}/sync/status", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync status API accessible before cancellation");
// Cancel sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync cancellation successful");
// Monitor sync status after cancellation
sleep(Duration::from_millis(500)).await;
let response = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri(format!("/api/sources/{}/sync/status", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync status API accessible after cancellation");
// Check database state
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("✅ Final database status: {:?}", final_source.status);
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Sync status monitoring during cancellation test passed");
}
#[tokio::test]
async fn test_cancellation_with_unauthorized_user() {
println!("🧪 Testing cancellation with unauthorized user");
let state = create_test_app_state().await;
let owner_user = create_test_user(&state).await;
let unauthorized_user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, owner_user.id, "Unauthorized Test Source").await;
let app = create_test_app(state.clone());
let unauthorized_auth_header = create_auth_header(&unauthorized_user, &state.config.jwt_secret);
println!("✅ Created test setup with unauthorized user");
// Try to cancel sync with unauthorized user
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &unauthorized_auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// Should return 404 (source not found for this user)
assert_eq!(response.status(), StatusCode::NOT_FOUND);
println!("✅ Unauthorized cancellation properly rejected with 404");
// Cleanup
state.db.delete_source(owner_user.id, source.id).await.unwrap();
state.db.delete_user(owner_user.id).await.unwrap();
state.db.delete_user(unauthorized_user.id).await.unwrap();
println!("🎉 Unauthorized user cancellation test passed");
}
#[tokio::test]
async fn test_cancellation_of_nonexistent_source() {
println!("🧪 Testing cancellation of nonexistent source");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
let nonexistent_source_id = Uuid::new_v4();
println!("✅ Created test setup for nonexistent source test");
// Try to cancel sync for nonexistent source
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", nonexistent_source_id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// Should return 404
assert_eq!(response.status(), StatusCode::NOT_FOUND);
println!("✅ Nonexistent source cancellation properly rejected with 404");
// Cleanup
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Nonexistent source cancellation test passed");
}
#[tokio::test]
async fn test_sync_start_cancel_start_sequence() {
println!("🧪 Testing sync start -> cancel -> start sequence");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Start Cancel Start Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for start-cancel-start sequence");
// Step 1: Start sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ First sync start: {}", response.status());
// Step 2: Wait briefly then cancel
sleep(Duration::from_millis(300)).await;
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Sync cancellation successful");
// Step 3: Wait for cancellation to complete
sleep(Duration::from_millis(1000)).await;
// Step 4: Start sync again
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// Should succeed or return expected error status
let status = response.status();
assert!(matches!(status, StatusCode::OK | StatusCode::CONFLICT | StatusCode::INTERNAL_SERVER_ERROR));
println!("✅ Second sync start after cancellation: {}", status);
// Step 5: Cancel the second sync
sleep(Duration::from_millis(300)).await;
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
println!("✅ Second cancellation successful");
// Verify final state
sleep(Duration::from_millis(1000)).await;
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("✅ Final source status: {:?}", final_source.status);
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Start-cancel-start sequence test passed");
}
/// Test that validates sync actually stops working, not just changes status
#[tokio::test]
async fn test_sync_actually_stops_working() {
println!("🧪 Testing that sync cancellation actually stops sync work");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Actual Stop Test Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for actual sync stop validation");
// First check that progress tracker shows no active syncs
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
assert!(initial_active_syncs.is_empty(), "Should have no active syncs initially");
assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing initially");
// Step 1: Start sync and verify it's actually registered as active
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Sync start response: {}", response.status());
// Step 2: Wait for sync to actually start and be registered
let mut sync_became_active = false;
for attempt in 1..=20 { // Wait up to 2 seconds
sleep(Duration::from_millis(100)).await;
if state.sync_progress_tracker.is_syncing(source.id) {
sync_became_active = true;
println!("✅ Sync became active after {} attempts ({}ms)", attempt, attempt * 100);
break;
}
}
// Verify sync actually became active
if !sync_became_active {
println!("⚠️ Sync never became active - checking database status");
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("Database source status: {:?}", db_source.status);
// If sync didn't start due to no scheduler or other issues, that's fine for this test
// The important part is that we test stopping when sync IS active
if db_source.status != SourceStatus::Syncing {
println!("⚠️ Skipping actual stop test - sync never started (likely no scheduler available)");
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
return;
}
}
// Step 3: Verify sync is tracked in multiple places
let active_syncs_before_stop = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Active syncs before stop: {:?}", active_syncs_before_stop);
let progress_before_stop = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Progress before stop: {:?}", progress_before_stop);
let db_source_before_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("📊 Database status before stop: {:?}", db_source_before_stop.status);
// Step 4: Stop the sync
let stop_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(stop_response.status(), StatusCode::OK);
println!("✅ Stop sync request successful");
// Step 5: Verify sync actually stops working
// Check progress tracker immediately (should be unregistered)
let progress_after_stop_immediate = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Progress immediately after stop: {:?}", progress_after_stop_immediate);
// Wait a bit for all cleanup to complete
sleep(Duration::from_millis(500)).await;
let active_syncs_after_stop = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Active syncs after stop: {:?}", active_syncs_after_stop);
let progress_after_stop = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Progress after stop with delay: {:?}", progress_after_stop);
let db_source_after_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("📊 Database status after stop: {:?}", db_source_after_stop.status);
// Step 6: Assertions to verify sync actually stopped
// The source should no longer be tracked as actively syncing
assert!(!state.sync_progress_tracker.is_syncing(source.id),
"Source should not be tracked as syncing after stop");
// The source should not be in the active syncs list
assert!(!active_syncs_after_stop.contains(&source.id),
"Source should not be in active syncs list after stop");
// Database status should be Idle (not Syncing)
assert_eq!(db_source_after_stop.status, SourceStatus::Idle,
"Database status should be Idle after stop");
// Progress should either be None or show as not active
if let Some(progress) = progress_after_stop {
assert!(!progress.is_active, "Progress should show as not active after stop");
}
println!("✅ All sync stop validations passed");
// Step 7: Test that sync can be restarted after stop
sleep(Duration::from_millis(1000)).await; // Wait for complete cleanup
let restart_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Restart after stop response: {}", restart_response.status());
// Stop the restarted sync for cleanup
sleep(Duration::from_millis(200)).await;
let final_stop_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Final stop response: {}", final_stop_response.status());
// Cleanup
sleep(Duration::from_millis(500)).await;
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Actual sync stop validation test passed");
}
/// Test that validates sync cancellation during different phases
#[tokio::test]
async fn test_sync_cancellation_during_different_phases() {
println!("🧪 Testing sync cancellation during different phases");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Phase Cancellation Test Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for phase-based cancellation");
// Test cancellation at different timing intervals to catch different phases
let test_delays = vec![50, 150, 300, 500]; // Different delays in milliseconds
for (i, delay_ms) in test_delays.iter().enumerate() {
println!("🔄 Testing cancellation after {}ms delay (iteration {})", delay_ms, i + 1);
// Start sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!(" 📡 Sync start status: {}", response.status());
// Wait for the specified delay to let sync progress
sleep(Duration::from_millis(*delay_ms)).await;
// Check what phase we might be in (if any)
let progress_info = state.sync_progress_tracker.get_progress(source.id);
if let Some(progress) = &progress_info {
println!(" 📊 Cancelling during phase: {} ({})", progress.phase, progress.phase_description);
} else {
println!(" 📊 No progress info available - sync may not have started or already completed");
}
// Cancel the sync
let cancel_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(cancel_response.status(), StatusCode::OK);
println!(" ✅ Cancellation successful");
// Verify cleanup
sleep(Duration::from_millis(300)).await;
assert!(!state.sync_progress_tracker.is_syncing(source.id),
"Source should not be syncing after cancellation in iteration {}", i + 1);
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
assert_eq!(db_source.status, SourceStatus::Idle,
"Source should be idle after cancellation in iteration {}", i + 1);
println!(" ✅ Cleanup verified for iteration {}", i + 1);
// Wait before next iteration to ensure complete cleanup
sleep(Duration::from_millis(500)).await;
}
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Phase-based cancellation test passed");
}
/// Test resource cleanup validation after sync cancellation
#[tokio::test]
async fn test_resource_cleanup_after_cancellation() {
println!("🧪 Testing resource cleanup after sync cancellation");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Resource Cleanup Test Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created test setup for resource cleanup validation");
// Record initial state
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
let initial_progress = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Initial active syncs: {:?}", initial_active_syncs);
println!("📊 Initial progress: {:?}", initial_progress);
// Start sync
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("✅ Sync started with status: {}", response.status());
// Wait for sync to become active
sleep(Duration::from_millis(200)).await;
// Record active state
let active_syncs_during = state.sync_progress_tracker.get_active_source_ids();
let progress_during = state.sync_progress_tracker.get_progress(source.id);
let is_syncing_during = state.sync_progress_tracker.is_syncing(source.id);
println!("📊 Active syncs during: {:?}", active_syncs_during);
println!("📊 Progress during: {:?}", progress_during);
println!("📊 Is syncing during: {}", is_syncing_during);
// Cancel sync
let cancel_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(cancel_response.status(), StatusCode::OK);
println!("✅ Sync cancellation successful");
// Wait for cleanup to complete
sleep(Duration::from_millis(1000)).await;
// Verify complete cleanup
let final_active_syncs = state.sync_progress_tracker.get_active_source_ids();
let final_progress = state.sync_progress_tracker.get_progress(source.id);
let is_syncing_final = state.sync_progress_tracker.is_syncing(source.id);
let db_source_final = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
println!("📊 Final active syncs: {:?}", final_active_syncs);
println!("📊 Final progress: {:?}", final_progress);
println!("📊 Is syncing final: {}", is_syncing_final);
println!("📊 Final DB status: {:?}", db_source_final.status);
// Assertions for complete cleanup
assert!(!final_active_syncs.contains(&source.id),
"Source should be removed from active syncs list");
assert!(!is_syncing_final,
"Progress tracker should not show source as syncing");
assert_eq!(db_source_final.status, SourceStatus::Idle,
"Database should show source as Idle");
// If progress exists, it should not be active
if let Some(progress) = final_progress {
assert!(!progress.is_active, "Any remaining progress should show as inactive");
}
// Test multiple rapid start/stop cycles to stress test cleanup
println!("🔄 Testing rapid start/stop cycles");
for cycle in 1..=3 {
println!(" 🔄 Cycle {}", cycle);
// Start
let start_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!(" 📡 Start: {}", start_response.status());
// Brief wait
sleep(Duration::from_millis(100)).await;
// Stop
let stop_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!(" 🛑 Stop: {}", stop_response.status());
// Verify cleanup after each cycle
sleep(Duration::from_millis(300)).await;
assert!(!state.sync_progress_tracker.is_syncing(source.id),
"Source should not be syncing after cycle {}", cycle);
let db_check = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
assert_eq!(db_check.status, SourceStatus::Idle,
"Source should be idle after cycle {}", cycle);
}
println!("✅ Rapid cycle cleanup verified");
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Resource cleanup validation test passed");
}
/// Test that validates cancellation token propagation through sync layers
#[tokio::test]
async fn test_cancellation_token_propagation() {
println!("🧪 Testing cancellation token propagation through sync layers");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Token Propagation Test Source").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
// Create multiple sources to test concurrent cancellation handling
let source2 = create_test_webdav_source(&state, user.id, "Second Token Test Source").await;
let source3 = create_test_webdav_source(&state, user.id, "Third Token Test Source").await;
println!("✅ Created test setup with multiple sources for token propagation");
// Start multiple syncs concurrently
let sync_futures = vec![
app.clone().oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
),
app.clone().oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source2.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
),
app.clone().oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source3.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
),
];
let results = futures::future::join_all(sync_futures).await;
for (i, result) in results.iter().enumerate() {
if let Ok(response) = result {
println!("✅ Source {} sync start: {}", i + 1, response.status());
}
}
// Wait for syncs to potentially start
sleep(Duration::from_millis(300)).await;
// Record which sources are actually active
let active_before = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Active syncs before cancellation: {:?}", active_before);
// Test individual cancellation (should only affect specific source)
let cancel_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(cancel_response.status(), StatusCode::OK);
println!("✅ Individual source cancellation successful");
// Wait for cancellation to propagate
sleep(Duration::from_millis(500)).await;
// Verify that only the cancelled source stopped
let active_after_individual = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Active syncs after individual cancellation: {:?}", active_after_individual);
// The cancelled source should not be active
assert!(!state.sync_progress_tracker.is_syncing(source.id),
"Cancelled source should not be syncing");
// Other sources might still be active (depending on implementation)
// The key test is that the cancellation was isolated to the correct source
// Cancel the remaining sources
let remaining_cancel_futures = vec![
app.clone().oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source2.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
),
app.clone().oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source3.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
),
];
let cancel_results = futures::future::join_all(remaining_cancel_futures).await;
for (i, result) in cancel_results.iter().enumerate() {
if let Ok(response) = result {
println!("✅ Remaining source {} cancel: {}", i + 2, response.status());
}
}
// Wait for all cancellations to complete
sleep(Duration::from_millis(1000)).await;
// Verify all sources are now idle
let final_active = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Final active syncs: {:?}", final_active);
assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source 1 should not be syncing");
assert!(!state.sync_progress_tracker.is_syncing(source2.id), "Source 2 should not be syncing");
assert!(!state.sync_progress_tracker.is_syncing(source3.id), "Source 3 should not be syncing");
// Verify database states
let db_sources = vec![
state.db.get_source(user.id, source.id).await.unwrap().unwrap(),
state.db.get_source(user.id, source2.id).await.unwrap().unwrap(),
state.db.get_source(user.id, source3.id).await.unwrap().unwrap(),
];
for (i, db_source) in db_sources.iter().enumerate() {
assert_eq!(db_source.status, SourceStatus::Idle,
"Database source {} should be idle", i + 1);
println!("📊 Database source {} status: {:?}", i + 1, db_source.status);
}
// Cleanup
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_source(user.id, source2.id).await.unwrap();
state.db.delete_source(user.id, source3.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 Cancellation token propagation test passed");
}
/// Comprehensive test that validates the complete sync cancellation workflow
/// This is the main test that covers all aspects of sync cancellation
#[tokio::test]
async fn test_comprehensive_sync_cancellation_validation() {
println!("🧪 COMPREHENSIVE TEST: Complete sync cancellation validation");
let state = create_test_app_state().await;
let user = create_test_user(&state).await;
let source = create_test_webdav_source(&state, user.id, "Comprehensive Cancellation Test").await;
let app = create_test_app(state.clone());
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
println!("✅ Created comprehensive test environment");
// PHASE 1: Validate initial state
println!("📝 PHASE 1: Initial state validation");
let initial_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
let initial_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
let initial_progress = state.sync_progress_tracker.get_progress(source.id);
assert_eq!(initial_db_source.status, SourceStatus::Idle, "Initial DB status should be Idle");
assert!(initial_active_syncs.is_empty(), "Initial active syncs should be empty");
assert!(!initial_is_syncing, "Initial sync state should be false");
assert!(initial_progress.is_none(), "Initial progress should be None");
println!("✅ PHASE 1 PASSED: All initial states correct");
// PHASE 2: Start sync and validate activation
println!("📝 PHASE 2: Sync activation validation");
let start_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("📡 Sync start response: {}", start_response.status());
// Wait for sync to activate and check multiple indicators
let mut sync_activation_verified = false;
for attempt in 1..=30 { // Wait up to 3 seconds
sleep(Duration::from_millis(100)).await;
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
let is_syncing = state.sync_progress_tracker.is_syncing(source.id);
let active_syncs = state.sync_progress_tracker.get_active_source_ids();
if db_source.status == SourceStatus::Syncing || is_syncing || active_syncs.contains(&source.id) {
sync_activation_verified = true;
println!("✅ Sync activation verified after {} attempts:", attempt);
println!(" 📊 DB Status: {:?}", db_source.status);
println!(" 📊 Is Syncing: {}", is_syncing);
println!(" 📊 Active Syncs: {:?}", active_syncs);
break;
}
}
if !sync_activation_verified {
println!("⚠️ PHASE 2 CONDITIONAL PASS: Sync never activated (likely no scheduler)");
// Cleanup and exit gracefully
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
return;
}
println!("✅ PHASE 2 PASSED: Sync activation verified");
// PHASE 3: Validate active sync state across all systems
println!("📝 PHASE 3: Active sync state validation");
let active_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
let active_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
let active_syncs_list = state.sync_progress_tracker.get_active_source_ids();
let active_progress = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Active state summary:");
println!(" 📊 DB Status: {:?}", active_db_source.status);
println!(" 📊 Is Syncing: {}", active_is_syncing);
println!(" 📊 Active Syncs: {:?}", active_syncs_list);
println!(" 📊 Progress Active: {:?}", active_progress.as_ref().map(|p| p.is_active));
// At least one indicator should show sync is active
let sync_indicators_active = active_db_source.status == SourceStatus::Syncing ||
active_is_syncing ||
active_syncs_list.contains(&source.id);
assert!(sync_indicators_active, "At least one sync indicator should show active state");
println!("✅ PHASE 3 PASSED: Active sync state validated");
// PHASE 4: Cancel sync and validate immediate response
println!("📝 PHASE 4: Sync cancellation execution");
let cancel_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(cancel_response.status(), StatusCode::OK);
println!("✅ PHASE 4 PASSED: Cancellation request successful");
// PHASE 5: Validate cancellation propagation and cleanup
println!("📝 PHASE 5: Cancellation cleanup validation");
// Check immediate state (some cleanup might be instant)
let immediate_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
let immediate_active_syncs = state.sync_progress_tracker.get_active_source_ids();
println!("📊 Immediate post-cancel state:");
println!(" 📊 Is Syncing: {}", immediate_is_syncing);
println!(" 📊 Active Syncs: {:?}", immediate_active_syncs);
// Wait for complete cleanup
sleep(Duration::from_millis(1500)).await;
let final_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
let final_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
let final_active_syncs = state.sync_progress_tracker.get_active_source_ids();
let final_progress = state.sync_progress_tracker.get_progress(source.id);
println!("📊 Final post-cancel state:");
println!(" 📊 DB Status: {:?}", final_db_source.status);
println!(" 📊 Is Syncing: {}", final_is_syncing);
println!(" 📊 Active Syncs: {:?}", final_active_syncs);
println!(" 📊 Progress: {:?}", final_progress.as_ref().map(|p| (p.is_active, &p.phase)));
// CRITICAL ASSERTIONS: These must all pass for proper cancellation
assert_eq!(final_db_source.status, SourceStatus::Idle,
"CRITICAL: Database status must be Idle after cancellation");
assert!(!final_is_syncing,
"CRITICAL: Progress tracker must not show source as syncing");
assert!(!final_active_syncs.contains(&source.id),
"CRITICAL: Source must not be in active syncs list");
if let Some(progress) = final_progress {
assert!(!progress.is_active,
"CRITICAL: Any remaining progress must show as inactive");
}
println!("✅ PHASE 5 PASSED: Complete cancellation cleanup verified");
// PHASE 6: Validate restart capability after cancellation
println!("📝 PHASE 6: Post-cancellation restart validation");
sleep(Duration::from_millis(500)).await; // Ensure complete cleanup
let restart_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("📡 Restart response: {}", restart_response.status());
// The restart should succeed (or fail with expected reasons, not due to lingering state)
let acceptable_restart_statuses = [StatusCode::OK, StatusCode::CONFLICT,
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::NOT_IMPLEMENTED];
assert!(acceptable_restart_statuses.contains(&restart_response.status()),
"Restart should succeed or fail with expected status, got: {}", restart_response.status());
// Clean up the restarted sync
sleep(Duration::from_millis(200)).await;
let final_cleanup_response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("/api/sources/{}/sync/stop", source.id))
.header("Authorization", &auth_header)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
println!("📡 Final cleanup response: {}", final_cleanup_response.status());
println!("✅ PHASE 6 PASSED: Restart capability validated");
// Final cleanup
sleep(Duration::from_millis(500)).await;
state.db.delete_source(user.id, source.id).await.unwrap();
state.db.delete_user(user.id).await.unwrap();
println!("🎉 COMPREHENSIVE TEST PASSED: Complete sync cancellation validation successful");
println!(" ✅ All 6 phases validated successfully");
println!(" ✅ Sync actually stops working (not just status changes)");
println!(" ✅ Resources properly cleaned up");
println!(" ✅ System remains in consistent state");
}