mirror of
https://github.com/readur/readur.git
synced 2025-12-17 20:35:17 -06:00
1539 lines
57 KiB
Rust
1539 lines
57 KiB
Rust
/*!
|
|
* Source Sync Cancellation Workflow Integration Tests
|
|
*
|
|
* Comprehensive end-to-end integration tests for source sync cancellation functionality:
|
|
* - Full sync cancellation workflow via API endpoints
|
|
* - Cancellation during different sync phases
|
|
* - Multiple cancellation request handling
|
|
* - Status monitoring and transitions
|
|
* - Resource cleanup verification
|
|
* - Database state consistency
|
|
*/
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use uuid::Uuid;
|
|
use chrono::Utc;
|
|
use serde_json::json;
|
|
use tokio::time::sleep;
|
|
use axum::{
|
|
body::Body,
|
|
extract::Path,
|
|
http::{Request, StatusCode},
|
|
Router,
|
|
};
|
|
use tower::ServiceExt;
|
|
|
|
use readur::{
|
|
AppState,
|
|
config::Config,
|
|
db::Database,
|
|
models::{Source, SourceType, SourceStatus, User, CreateSource, CreateUser, UserRole},
|
|
auth::Claims,
|
|
test_helpers::create_test_config_with_db,
|
|
};
|
|
|
|
/// Create a test app state with database and real source scheduler
|
|
async fn create_test_app_state() -> Arc<AppState> {
|
|
let database_url = std::env::var("TEST_DATABASE_URL")
|
|
.or_else(|_| std::env::var("DATABASE_URL"))
|
|
.unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string());
|
|
|
|
let mut config = create_test_config_with_db(&database_url);
|
|
config.server_address = "127.0.0.1:8080".to_string();
|
|
config.jwt_secret = "test_secret_for_sync_cancellation".to_string();
|
|
config.upload_path = "/tmp/test_uploads_sync_cancel".to_string();
|
|
config.watch_folder = "/tmp/watch_sync_cancel".to_string();
|
|
config.allowed_file_types = vec!["pdf".to_string(), "txt".to_string(), "jpg".to_string(), "png".to_string()];
|
|
|
|
let db = Database::new(&config.database_url).await.unwrap();
|
|
|
|
// Create file service
|
|
let storage_config = readur::storage::StorageConfig::Local { upload_path: config.upload_path.clone() };
|
|
let storage_backend = readur::storage::factory::create_storage_backend(storage_config).await.unwrap();
|
|
let file_service = Arc::new(readur::services::file_service::FileService::with_storage(config.upload_path.clone(), storage_backend));
|
|
|
|
let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(
|
|
db.clone(),
|
|
db.pool.clone(),
|
|
2,
|
|
file_service.clone(),
|
|
100,
|
|
100,
|
|
));
|
|
|
|
let sync_progress_tracker = Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new());
|
|
|
|
// Create initial app state
|
|
let app_state = AppState {
|
|
db: db.clone(),
|
|
config,
|
|
file_service: file_service.clone(),
|
|
webdav_scheduler: None,
|
|
source_scheduler: None,
|
|
queue_service,
|
|
oidc_client: None,
|
|
sync_progress_tracker,
|
|
user_watch_service: None,
|
|
webdav_metrics_collector: None,
|
|
};
|
|
|
|
// Wrap in Arc for sharing
|
|
let state_arc = Arc::new(app_state);
|
|
|
|
// Create the real source scheduler
|
|
let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(state_arc.clone()));
|
|
|
|
// Now we need to update the AppState with the scheduler
|
|
// Since AppState is already wrapped in Arc, we need to use a different approach
|
|
// Let's create a new AppState with the scheduler
|
|
Arc::new(AppState {
|
|
db: state_arc.db.clone(),
|
|
config: state_arc.config.clone(),
|
|
file_service: state_arc.file_service.clone(),
|
|
webdav_scheduler: None,
|
|
source_scheduler: Some(source_scheduler),
|
|
queue_service: state_arc.queue_service.clone(),
|
|
oidc_client: None,
|
|
sync_progress_tracker: state_arc.sync_progress_tracker.clone(),
|
|
user_watch_service: None,
|
|
webdav_metrics_collector: None,
|
|
})
|
|
}
|
|
|
|
/// Create a test user for sync cancellation tests
|
|
async fn create_test_user(state: &AppState) -> User {
|
|
let user_id = Uuid::new_v4();
|
|
let create_user = CreateUser {
|
|
username: format!("testuser_sync_cancel_{}", user_id),
|
|
email: format!("testuser_sync_cancel_{}@example.com", user_id),
|
|
password: "test_password".to_string(),
|
|
role: Some(UserRole::Admin),
|
|
};
|
|
|
|
state.db.create_user(create_user).await.unwrap()
|
|
}
|
|
|
|
/// Create a test WebDAV source for cancellation testing
|
|
/// Uses a non-existent server so sync will fail, but we can test the cancellation workflow
|
|
async fn create_test_webdav_source(state: &AppState, user_id: Uuid, name: &str) -> Source {
|
|
let create_source = CreateSource {
|
|
name: name.to_string(),
|
|
source_type: SourceType::WebDAV,
|
|
enabled: Some(true),
|
|
config: json!({
|
|
"server_url": "https://test-webdav-server-for-cancellation-testing.example.com/remote.php/webdav",
|
|
"username": "test_user",
|
|
"password": "test_password",
|
|
"watch_folders": ["/TestFolder"],
|
|
"file_extensions": [".pdf", ".txt", ".jpg", ".png"],
|
|
"auto_sync": false,
|
|
"sync_interval_minutes": 60,
|
|
"server_type": "nextcloud"
|
|
}),
|
|
};
|
|
|
|
state.db.create_source(user_id, &create_source).await.unwrap()
|
|
}
|
|
|
|
/// Wait for a source to reach a specific status with timeout
|
|
async fn wait_for_source_status(
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
source_id: Uuid,
|
|
expected_status: SourceStatus,
|
|
timeout_ms: u64
|
|
) -> bool {
|
|
let start_time = std::time::Instant::now();
|
|
let timeout_duration = Duration::from_millis(timeout_ms);
|
|
|
|
while start_time.elapsed() < timeout_duration {
|
|
if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await {
|
|
if source.status == expected_status {
|
|
return true;
|
|
}
|
|
}
|
|
sleep(Duration::from_millis(100)).await;
|
|
}
|
|
false
|
|
}
|
|
|
|
/// Wait for sync to actually start (status becomes Syncing)
|
|
async fn wait_for_sync_to_start(
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
source_id: Uuid,
|
|
timeout_ms: u64
|
|
) -> bool {
|
|
wait_for_source_status(state, user_id, source_id, SourceStatus::Syncing, timeout_ms).await
|
|
}
|
|
|
|
/// Wait for sync to stop (status becomes Idle or Error)
|
|
async fn wait_for_sync_to_stop(
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
source_id: Uuid,
|
|
timeout_ms: u64
|
|
) -> bool {
|
|
let start_time = std::time::Instant::now();
|
|
let timeout_duration = Duration::from_millis(timeout_ms);
|
|
|
|
while start_time.elapsed() < timeout_duration {
|
|
if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await {
|
|
if matches!(source.status, SourceStatus::Idle | SourceStatus::Error) {
|
|
return true;
|
|
}
|
|
}
|
|
sleep(Duration::from_millis(100)).await;
|
|
}
|
|
false
|
|
}
|
|
|
|
/// Create HTTP client for API testing
|
|
fn create_test_app(state: Arc<AppState>) -> Router {
|
|
use axum::{routing::get, Router};
|
|
|
|
Router::new()
|
|
.route("/api/health", get(readur::health_check))
|
|
.nest("/api/auth", readur::routes::auth::router())
|
|
.nest("/api/documents", readur::routes::documents::router())
|
|
.nest("/api/ignored-files", readur::routes::ignored_files::ignored_files_routes())
|
|
.nest("/api/labels", readur::routes::labels::router())
|
|
.nest("/api/metrics", readur::routes::metrics::router())
|
|
.nest("/metrics", readur::routes::prometheus_metrics::router())
|
|
.nest("/api/notifications", readur::routes::notifications::router())
|
|
.nest("/api/ocr", readur::routes::ocr::router())
|
|
.nest("/api/queue", readur::routes::queue::router())
|
|
.nest("/api/search", readur::routes::search::router())
|
|
.nest("/api/settings", readur::routes::settings::router())
|
|
.nest("/api/sources", readur::routes::sources::router())
|
|
.nest("/api/users", readur::routes::users::router())
|
|
.nest("/api/webdav", readur::routes::webdav::router())
|
|
.with_state(state)
|
|
}
|
|
|
|
/// Create authorization header for test user
|
|
fn create_auth_header(user: &User, jwt_secret: &str) -> String {
|
|
let claims = Claims {
|
|
sub: user.id,
|
|
username: user.username.clone(),
|
|
exp: (chrono::Utc::now() + chrono::Duration::hours(24)).timestamp() as usize,
|
|
};
|
|
|
|
let token = jsonwebtoken::encode(
|
|
&jsonwebtoken::Header::default(),
|
|
&claims,
|
|
&jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_ref()),
|
|
).unwrap();
|
|
|
|
format!("Bearer {}", token)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_complete_sync_cancellation_workflow() {
|
|
println!("🧪 Testing complete sync cancellation workflow");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Test Cancellation Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
// Create auth header
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test user and source: {}", source.id);
|
|
|
|
// Step 1: Verify source is initially idle
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("GET")
|
|
.uri(format!("/api/sources/{}/sync/status", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Source is initially idle");
|
|
|
|
// Step 2: Start sync using the real scheduler
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let actual_status = response.status();
|
|
println!("🔍 Sync start actual status: {}", actual_status);
|
|
|
|
// With real scheduler, should return OK (unless already running)
|
|
assert!(matches!(actual_status, StatusCode::OK | StatusCode::CONFLICT));
|
|
println!("✅ Sync start request completed with status: {}", actual_status);
|
|
|
|
// Step 3: Wait for sync to actually start (with real scheduler)
|
|
let sync_started = wait_for_sync_to_start(&state, user.id, source.id, 5000).await;
|
|
if sync_started {
|
|
println!("✅ Sync actually started - status changed to Syncing");
|
|
|
|
// Give it a moment to establish the sync
|
|
sleep(Duration::from_millis(500)).await;
|
|
} else {
|
|
println!("⚠️ Sync did not start within timeout (may fail quickly due to invalid server)");
|
|
// The sync might fail immediately due to invalid server, which is fine for testing cancellation
|
|
}
|
|
|
|
// Step 4: Cancel the sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync cancellation request successful");
|
|
|
|
// Step 5: Wait for sync to actually stop with real scheduler
|
|
let sync_stopped = wait_for_sync_to_stop(&state, user.id, source.id, 10000).await;
|
|
if sync_stopped {
|
|
println!("✅ Sync actually stopped - status changed to Idle/Error");
|
|
} else {
|
|
println!("⚠️ Sync did not stop within timeout, checking current status");
|
|
}
|
|
|
|
let source_after_cancel = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("✅ Source status after cancellation: {:?}", source_after_cancel.status);
|
|
|
|
// With real scheduler, we should see proper status transitions
|
|
assert!(matches!(source_after_cancel.status, SourceStatus::Idle | SourceStatus::Error),
|
|
"Source should be Idle or Error after cancellation, got: {:?}", source_after_cancel.status);
|
|
|
|
// Step 6: Verify sync status API shows no active sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("GET")
|
|
.uri(format!("/api/sources/{}/sync/status", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync status API accessible after cancellation");
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Complete sync cancellation workflow test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_multiple_cancellation_requests() {
|
|
println!("🧪 Testing multiple cancellation requests handling");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Multiple Cancel Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for multiple cancellation test");
|
|
|
|
// Start sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Sync started with status: {}", response.status());
|
|
|
|
// Wait briefly
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
// Send multiple cancellation requests concurrently
|
|
let mut cancel_handles = Vec::new();
|
|
|
|
for i in 0..3 {
|
|
let app_clone = app.clone();
|
|
let auth_header_clone = auth_header.clone();
|
|
let source_id = source.id;
|
|
|
|
let handle = tokio::spawn(async move {
|
|
let response = app_clone
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source_id))
|
|
.header("Authorization", auth_header_clone)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Cancellation request {} completed with status: {}", i + 1, response.status());
|
|
response.status()
|
|
});
|
|
|
|
cancel_handles.push(handle);
|
|
}
|
|
|
|
// Wait for all cancellation requests to complete
|
|
let mut success_count = 0;
|
|
for handle in cancel_handles {
|
|
let status = handle.await.unwrap();
|
|
if status == StatusCode::OK {
|
|
success_count += 1;
|
|
}
|
|
}
|
|
|
|
// All cancellation requests should succeed (idempotent)
|
|
assert_eq!(success_count, 3);
|
|
println!("✅ All {} cancellation requests succeeded", success_count);
|
|
|
|
// Verify final state
|
|
sleep(Duration::from_millis(1000)).await;
|
|
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("✅ Final source status: {:?}", final_source.status);
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Multiple cancellation requests test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cancellation_without_active_sync() {
|
|
println!("🧪 Testing cancellation when no sync is active");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "No Active Sync Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for no active sync test");
|
|
|
|
// Verify source is idle
|
|
let initial_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
assert_eq!(initial_source.status, SourceStatus::Idle);
|
|
println!("✅ Source is initially idle: {:?}", initial_source.status);
|
|
|
|
// Try to cancel sync when none is active
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Should succeed (idempotent behavior)
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Cancellation without active sync succeeded");
|
|
|
|
// Verify source remains idle
|
|
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
assert_eq!(final_source.status, SourceStatus::Idle);
|
|
println!("✅ Source remains idle after cancellation: {:?}", final_source.status);
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Cancellation without active sync test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_sync_status_monitoring_during_cancellation() {
|
|
println!("🧪 Testing sync status monitoring during cancellation");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Status Monitor Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for status monitoring test");
|
|
|
|
// Start sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Sync started with status: {}", response.status());
|
|
|
|
// Monitor sync status before cancellation
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("GET")
|
|
.uri(format!("/api/sources/{}/sync/status", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync status API accessible before cancellation");
|
|
|
|
// Cancel sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync cancellation successful");
|
|
|
|
// Monitor sync status after cancellation
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("GET")
|
|
.uri(format!("/api/sources/{}/sync/status", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync status API accessible after cancellation");
|
|
|
|
// Check database state
|
|
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("✅ Final database status: {:?}", final_source.status);
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Sync status monitoring during cancellation test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cancellation_with_unauthorized_user() {
|
|
println!("🧪 Testing cancellation with unauthorized user");
|
|
|
|
let state = create_test_app_state().await;
|
|
let owner_user = create_test_user(&state).await;
|
|
let unauthorized_user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, owner_user.id, "Unauthorized Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let unauthorized_auth_header = create_auth_header(&unauthorized_user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup with unauthorized user");
|
|
|
|
// Try to cancel sync with unauthorized user
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &unauthorized_auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Should return 404 (source not found for this user)
|
|
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
|
println!("✅ Unauthorized cancellation properly rejected with 404");
|
|
|
|
// Cleanup
|
|
state.db.delete_source(owner_user.id, source.id).await.unwrap();
|
|
state.db.delete_user(owner_user.id).await.unwrap();
|
|
state.db.delete_user(unauthorized_user.id).await.unwrap();
|
|
|
|
println!("🎉 Unauthorized user cancellation test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cancellation_of_nonexistent_source() {
|
|
println!("🧪 Testing cancellation of nonexistent source");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
let nonexistent_source_id = Uuid::new_v4();
|
|
|
|
println!("✅ Created test setup for nonexistent source test");
|
|
|
|
// Try to cancel sync for nonexistent source
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", nonexistent_source_id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Should return 404
|
|
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
|
println!("✅ Nonexistent source cancellation properly rejected with 404");
|
|
|
|
// Cleanup
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Nonexistent source cancellation test passed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_sync_start_cancel_start_sequence() {
|
|
println!("🧪 Testing sync start -> cancel -> start sequence");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Start Cancel Start Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for start-cancel-start sequence");
|
|
|
|
// Step 1: Start sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ First sync start: {}", response.status());
|
|
|
|
// Step 2: Wait briefly then cancel
|
|
sleep(Duration::from_millis(300)).await;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Sync cancellation successful");
|
|
|
|
// Step 3: Wait for cancellation to complete
|
|
sleep(Duration::from_millis(1000)).await;
|
|
|
|
// Step 4: Start sync again
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Should succeed or return expected error status
|
|
let status = response.status();
|
|
assert!(matches!(status, StatusCode::OK | StatusCode::CONFLICT | StatusCode::INTERNAL_SERVER_ERROR));
|
|
println!("✅ Second sync start after cancellation: {}", status);
|
|
|
|
// Step 5: Cancel the second sync
|
|
sleep(Duration::from_millis(300)).await;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
println!("✅ Second cancellation successful");
|
|
|
|
// Verify final state
|
|
sleep(Duration::from_millis(1000)).await;
|
|
let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("✅ Final source status: {:?}", final_source.status);
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Start-cancel-start sequence test passed");
|
|
}
|
|
|
|
/// Test that validates sync actually stops working, not just changes status
|
|
#[tokio::test]
|
|
async fn test_sync_actually_stops_working() {
|
|
println!("🧪 Testing that sync cancellation actually stops sync work");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Actual Stop Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for actual sync stop validation");
|
|
|
|
// First check that progress tracker shows no active syncs
|
|
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
assert!(initial_active_syncs.is_empty(), "Should have no active syncs initially");
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing initially");
|
|
|
|
// Step 1: Start sync and verify it's actually registered as active
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Sync start response: {}", response.status());
|
|
|
|
// Step 2: Wait for sync to actually start and be registered
|
|
let mut sync_became_active = false;
|
|
for attempt in 1..=20 { // Wait up to 2 seconds
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
if state.sync_progress_tracker.is_syncing(source.id) {
|
|
sync_became_active = true;
|
|
println!("✅ Sync became active after {} attempts ({}ms)", attempt, attempt * 100);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Verify sync actually became active
|
|
if !sync_became_active {
|
|
println!("⚠️ Sync never became active - checking database status");
|
|
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("Database source status: {:?}", db_source.status);
|
|
|
|
// If sync didn't start due to no scheduler or other issues, that's fine for this test
|
|
// The important part is that we test stopping when sync IS active
|
|
if db_source.status != SourceStatus::Syncing {
|
|
println!("⚠️ Skipping actual stop test - sync never started (likely no scheduler available)");
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Step 3: Verify sync is tracked in multiple places
|
|
let active_syncs_before_stop = state.sync_progress_tracker.get_active_source_ids();
|
|
println!("📊 Active syncs before stop: {:?}", active_syncs_before_stop);
|
|
|
|
let progress_before_stop = state.sync_progress_tracker.get_progress(source.id);
|
|
println!("📊 Progress before stop: {:?}", progress_before_stop);
|
|
|
|
let db_source_before_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("📊 Database status before stop: {:?}", db_source_before_stop.status);
|
|
|
|
// Step 4: Stop the sync
|
|
let stop_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(stop_response.status(), StatusCode::OK);
|
|
println!("✅ Stop sync request successful");
|
|
|
|
// Step 5: Verify sync actually stops working
|
|
// Check progress tracker immediately (should be unregistered)
|
|
let progress_after_stop_immediate = state.sync_progress_tracker.get_progress(source.id);
|
|
println!("📊 Progress immediately after stop: {:?}", progress_after_stop_immediate);
|
|
|
|
// Wait a bit for all cleanup to complete
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
let active_syncs_after_stop = state.sync_progress_tracker.get_active_source_ids();
|
|
println!("📊 Active syncs after stop: {:?}", active_syncs_after_stop);
|
|
|
|
let progress_after_stop = state.sync_progress_tracker.get_progress(source.id);
|
|
println!("📊 Progress after stop with delay: {:?}", progress_after_stop);
|
|
|
|
let db_source_after_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
println!("📊 Database status after stop: {:?}", db_source_after_stop.status);
|
|
|
|
// Step 6: Assertions to verify sync actually stopped
|
|
|
|
// The source should no longer be tracked as actively syncing
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id),
|
|
"Source should not be tracked as syncing after stop");
|
|
|
|
// The source should not be in the active syncs list
|
|
assert!(!active_syncs_after_stop.contains(&source.id),
|
|
"Source should not be in active syncs list after stop");
|
|
|
|
// Database status should be Idle (not Syncing)
|
|
assert_eq!(db_source_after_stop.status, SourceStatus::Idle,
|
|
"Database status should be Idle after stop");
|
|
|
|
// Progress should either be None or show as not active
|
|
if let Some(progress) = progress_after_stop {
|
|
assert!(!progress.is_active, "Progress should show as not active after stop");
|
|
}
|
|
|
|
println!("✅ All sync stop validations passed");
|
|
|
|
// Step 7: Test that sync can be restarted after stop
|
|
sleep(Duration::from_millis(1000)).await; // Wait for complete cleanup
|
|
|
|
let restart_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Restart after stop response: {}", restart_response.status());
|
|
|
|
// Stop the restarted sync for cleanup
|
|
sleep(Duration::from_millis(200)).await;
|
|
let final_stop_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Final stop response: {}", final_stop_response.status());
|
|
|
|
// Cleanup
|
|
sleep(Duration::from_millis(500)).await;
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Actual sync stop validation test passed");
|
|
}
|
|
|
|
/// Test that validates sync cancellation during different phases
|
|
#[tokio::test]
|
|
async fn test_sync_cancellation_during_different_phases() {
|
|
println!("🧪 Testing sync cancellation during different phases");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Phase Cancellation Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for phase-based cancellation");
|
|
|
|
// Test cancellation at different timing intervals to catch different phases
|
|
let test_delays = vec![50, 150, 300, 500]; // Different delays in milliseconds
|
|
|
|
for (i, delay_ms) in test_delays.iter().enumerate() {
|
|
println!("🔄 Testing cancellation after {}ms delay (iteration {})", delay_ms, i + 1);
|
|
|
|
// Start sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!(" 📡 Sync start status: {}", response.status());
|
|
|
|
// Wait for the specified delay to let sync progress
|
|
sleep(Duration::from_millis(*delay_ms)).await;
|
|
|
|
// Check what phase we might be in (if any)
|
|
let progress_info = state.sync_progress_tracker.get_progress(source.id);
|
|
if let Some(progress) = &progress_info {
|
|
println!(" 📊 Cancelling during phase: {} ({})", progress.phase, progress.phase_description);
|
|
} else {
|
|
println!(" 📊 No progress info available - sync may not have started or already completed");
|
|
}
|
|
|
|
// Cancel the sync
|
|
let cancel_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(cancel_response.status(), StatusCode::OK);
|
|
println!(" ✅ Cancellation successful");
|
|
|
|
// Verify cleanup
|
|
sleep(Duration::from_millis(300)).await;
|
|
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id),
|
|
"Source should not be syncing after cancellation in iteration {}", i + 1);
|
|
|
|
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
assert_eq!(db_source.status, SourceStatus::Idle,
|
|
"Source should be idle after cancellation in iteration {}", i + 1);
|
|
|
|
println!(" ✅ Cleanup verified for iteration {}", i + 1);
|
|
|
|
// Wait before next iteration to ensure complete cleanup
|
|
sleep(Duration::from_millis(500)).await;
|
|
}
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Phase-based cancellation test passed");
|
|
}
|
|
|
|
/// Test resource cleanup validation after sync cancellation
|
|
#[tokio::test]
|
|
async fn test_resource_cleanup_after_cancellation() {
|
|
println!("🧪 Testing resource cleanup after sync cancellation");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Resource Cleanup Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created test setup for resource cleanup validation");
|
|
|
|
// Record initial state
|
|
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
let initial_progress = state.sync_progress_tracker.get_progress(source.id);
|
|
|
|
println!("📊 Initial active syncs: {:?}", initial_active_syncs);
|
|
println!("📊 Initial progress: {:?}", initial_progress);
|
|
|
|
// Start sync
|
|
let response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("✅ Sync started with status: {}", response.status());
|
|
|
|
// Wait for sync to become active
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
// Record active state
|
|
let active_syncs_during = state.sync_progress_tracker.get_active_source_ids();
|
|
let progress_during = state.sync_progress_tracker.get_progress(source.id);
|
|
let is_syncing_during = state.sync_progress_tracker.is_syncing(source.id);
|
|
|
|
println!("📊 Active syncs during: {:?}", active_syncs_during);
|
|
println!("📊 Progress during: {:?}", progress_during);
|
|
println!("📊 Is syncing during: {}", is_syncing_during);
|
|
|
|
// Cancel sync
|
|
let cancel_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(cancel_response.status(), StatusCode::OK);
|
|
println!("✅ Sync cancellation successful");
|
|
|
|
// Wait for cleanup to complete
|
|
sleep(Duration::from_millis(1000)).await;
|
|
|
|
// Verify complete cleanup
|
|
let final_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
let final_progress = state.sync_progress_tracker.get_progress(source.id);
|
|
let is_syncing_final = state.sync_progress_tracker.is_syncing(source.id);
|
|
let db_source_final = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
|
|
println!("📊 Final active syncs: {:?}", final_active_syncs);
|
|
println!("📊 Final progress: {:?}", final_progress);
|
|
println!("📊 Is syncing final: {}", is_syncing_final);
|
|
println!("📊 Final DB status: {:?}", db_source_final.status);
|
|
|
|
// Assertions for complete cleanup
|
|
assert!(!final_active_syncs.contains(&source.id),
|
|
"Source should be removed from active syncs list");
|
|
|
|
assert!(!is_syncing_final,
|
|
"Progress tracker should not show source as syncing");
|
|
|
|
assert_eq!(db_source_final.status, SourceStatus::Idle,
|
|
"Database should show source as Idle");
|
|
|
|
// If progress exists, it should not be active
|
|
if let Some(progress) = final_progress {
|
|
assert!(!progress.is_active, "Any remaining progress should show as inactive");
|
|
}
|
|
|
|
// Test multiple rapid start/stop cycles to stress test cleanup
|
|
println!("🔄 Testing rapid start/stop cycles");
|
|
|
|
for cycle in 1..=3 {
|
|
println!(" 🔄 Cycle {}", cycle);
|
|
|
|
// Start
|
|
let start_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!(" 📡 Start: {}", start_response.status());
|
|
|
|
// Brief wait
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
// Stop
|
|
let stop_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!(" 🛑 Stop: {}", stop_response.status());
|
|
|
|
// Verify cleanup after each cycle
|
|
sleep(Duration::from_millis(300)).await;
|
|
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id),
|
|
"Source should not be syncing after cycle {}", cycle);
|
|
|
|
let db_check = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
assert_eq!(db_check.status, SourceStatus::Idle,
|
|
"Source should be idle after cycle {}", cycle);
|
|
}
|
|
|
|
println!("✅ Rapid cycle cleanup verified");
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Resource cleanup validation test passed");
|
|
}
|
|
|
|
/// Test that validates cancellation token propagation through sync layers
|
|
#[tokio::test]
|
|
async fn test_cancellation_token_propagation() {
|
|
println!("🧪 Testing cancellation token propagation through sync layers");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Token Propagation Test Source").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
// Create multiple sources to test concurrent cancellation handling
|
|
let source2 = create_test_webdav_source(&state, user.id, "Second Token Test Source").await;
|
|
let source3 = create_test_webdav_source(&state, user.id, "Third Token Test Source").await;
|
|
|
|
println!("✅ Created test setup with multiple sources for token propagation");
|
|
|
|
// Start multiple syncs concurrently
|
|
let sync_futures = vec![
|
|
app.clone().oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
),
|
|
app.clone().oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source2.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
),
|
|
app.clone().oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source3.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
),
|
|
];
|
|
|
|
let results = futures::future::join_all(sync_futures).await;
|
|
for (i, result) in results.iter().enumerate() {
|
|
if let Ok(response) = result {
|
|
println!("✅ Source {} sync start: {}", i + 1, response.status());
|
|
}
|
|
}
|
|
|
|
// Wait for syncs to potentially start
|
|
sleep(Duration::from_millis(300)).await;
|
|
|
|
// Record which sources are actually active
|
|
let active_before = state.sync_progress_tracker.get_active_source_ids();
|
|
println!("📊 Active syncs before cancellation: {:?}", active_before);
|
|
|
|
// Test individual cancellation (should only affect specific source)
|
|
let cancel_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(cancel_response.status(), StatusCode::OK);
|
|
println!("✅ Individual source cancellation successful");
|
|
|
|
// Wait for cancellation to propagate
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Verify that only the cancelled source stopped
|
|
let active_after_individual = state.sync_progress_tracker.get_active_source_ids();
|
|
println!("📊 Active syncs after individual cancellation: {:?}", active_after_individual);
|
|
|
|
// The cancelled source should not be active
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id),
|
|
"Cancelled source should not be syncing");
|
|
|
|
// Other sources might still be active (depending on implementation)
|
|
// The key test is that the cancellation was isolated to the correct source
|
|
|
|
// Cancel the remaining sources
|
|
let remaining_cancel_futures = vec![
|
|
app.clone().oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source2.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
),
|
|
app.clone().oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source3.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
),
|
|
];
|
|
|
|
let cancel_results = futures::future::join_all(remaining_cancel_futures).await;
|
|
for (i, result) in cancel_results.iter().enumerate() {
|
|
if let Ok(response) = result {
|
|
println!("✅ Remaining source {} cancel: {}", i + 2, response.status());
|
|
}
|
|
}
|
|
|
|
// Wait for all cancellations to complete
|
|
sleep(Duration::from_millis(1000)).await;
|
|
|
|
// Verify all sources are now idle
|
|
let final_active = state.sync_progress_tracker.get_active_source_ids();
|
|
println!("📊 Final active syncs: {:?}", final_active);
|
|
|
|
assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source 1 should not be syncing");
|
|
assert!(!state.sync_progress_tracker.is_syncing(source2.id), "Source 2 should not be syncing");
|
|
assert!(!state.sync_progress_tracker.is_syncing(source3.id), "Source 3 should not be syncing");
|
|
|
|
// Verify database states
|
|
let db_sources = vec![
|
|
state.db.get_source(user.id, source.id).await.unwrap().unwrap(),
|
|
state.db.get_source(user.id, source2.id).await.unwrap().unwrap(),
|
|
state.db.get_source(user.id, source3.id).await.unwrap().unwrap(),
|
|
];
|
|
|
|
for (i, db_source) in db_sources.iter().enumerate() {
|
|
assert_eq!(db_source.status, SourceStatus::Idle,
|
|
"Database source {} should be idle", i + 1);
|
|
println!("📊 Database source {} status: {:?}", i + 1, db_source.status);
|
|
}
|
|
|
|
// Cleanup
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_source(user.id, source2.id).await.unwrap();
|
|
state.db.delete_source(user.id, source3.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 Cancellation token propagation test passed");
|
|
}
|
|
|
|
/// Comprehensive test that validates the complete sync cancellation workflow
|
|
/// This is the main test that covers all aspects of sync cancellation
|
|
#[tokio::test]
|
|
async fn test_comprehensive_sync_cancellation_validation() {
|
|
println!("🧪 COMPREHENSIVE TEST: Complete sync cancellation validation");
|
|
|
|
let state = create_test_app_state().await;
|
|
let user = create_test_user(&state).await;
|
|
let source = create_test_webdav_source(&state, user.id, "Comprehensive Cancellation Test").await;
|
|
let app = create_test_app(state.clone());
|
|
|
|
let auth_header = create_auth_header(&user, &state.config.jwt_secret);
|
|
|
|
println!("✅ Created comprehensive test environment");
|
|
|
|
// PHASE 1: Validate initial state
|
|
println!("📝 PHASE 1: Initial state validation");
|
|
|
|
let initial_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
let initial_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
|
|
let initial_progress = state.sync_progress_tracker.get_progress(source.id);
|
|
|
|
assert_eq!(initial_db_source.status, SourceStatus::Idle, "Initial DB status should be Idle");
|
|
assert!(initial_active_syncs.is_empty(), "Initial active syncs should be empty");
|
|
assert!(!initial_is_syncing, "Initial sync state should be false");
|
|
assert!(initial_progress.is_none(), "Initial progress should be None");
|
|
|
|
println!("✅ PHASE 1 PASSED: All initial states correct");
|
|
|
|
// PHASE 2: Start sync and validate activation
|
|
println!("📝 PHASE 2: Sync activation validation");
|
|
|
|
let start_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("📡 Sync start response: {}", start_response.status());
|
|
|
|
// Wait for sync to activate and check multiple indicators
|
|
let mut sync_activation_verified = false;
|
|
for attempt in 1..=30 { // Wait up to 3 seconds
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
let is_syncing = state.sync_progress_tracker.is_syncing(source.id);
|
|
let active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
|
|
if db_source.status == SourceStatus::Syncing || is_syncing || active_syncs.contains(&source.id) {
|
|
sync_activation_verified = true;
|
|
println!("✅ Sync activation verified after {} attempts:", attempt);
|
|
println!(" 📊 DB Status: {:?}", db_source.status);
|
|
println!(" 📊 Is Syncing: {}", is_syncing);
|
|
println!(" 📊 Active Syncs: {:?}", active_syncs);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !sync_activation_verified {
|
|
println!("⚠️ PHASE 2 CONDITIONAL PASS: Sync never activated (likely no scheduler)");
|
|
// Cleanup and exit gracefully
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
return;
|
|
}
|
|
|
|
println!("✅ PHASE 2 PASSED: Sync activation verified");
|
|
|
|
// PHASE 3: Validate active sync state across all systems
|
|
println!("📝 PHASE 3: Active sync state validation");
|
|
|
|
let active_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
let active_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
|
|
let active_syncs_list = state.sync_progress_tracker.get_active_source_ids();
|
|
let active_progress = state.sync_progress_tracker.get_progress(source.id);
|
|
|
|
println!("📊 Active state summary:");
|
|
println!(" 📊 DB Status: {:?}", active_db_source.status);
|
|
println!(" 📊 Is Syncing: {}", active_is_syncing);
|
|
println!(" 📊 Active Syncs: {:?}", active_syncs_list);
|
|
println!(" 📊 Progress Active: {:?}", active_progress.as_ref().map(|p| p.is_active));
|
|
|
|
// At least one indicator should show sync is active
|
|
let sync_indicators_active = active_db_source.status == SourceStatus::Syncing ||
|
|
active_is_syncing ||
|
|
active_syncs_list.contains(&source.id);
|
|
|
|
assert!(sync_indicators_active, "At least one sync indicator should show active state");
|
|
|
|
println!("✅ PHASE 3 PASSED: Active sync state validated");
|
|
|
|
// PHASE 4: Cancel sync and validate immediate response
|
|
println!("📝 PHASE 4: Sync cancellation execution");
|
|
|
|
let cancel_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(cancel_response.status(), StatusCode::OK);
|
|
println!("✅ PHASE 4 PASSED: Cancellation request successful");
|
|
|
|
// PHASE 5: Validate cancellation propagation and cleanup
|
|
println!("📝 PHASE 5: Cancellation cleanup validation");
|
|
|
|
// Check immediate state (some cleanup might be instant)
|
|
let immediate_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
|
|
let immediate_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
|
|
println!("📊 Immediate post-cancel state:");
|
|
println!(" 📊 Is Syncing: {}", immediate_is_syncing);
|
|
println!(" 📊 Active Syncs: {:?}", immediate_active_syncs);
|
|
|
|
// Wait for complete cleanup
|
|
sleep(Duration::from_millis(1500)).await;
|
|
|
|
let final_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap();
|
|
let final_is_syncing = state.sync_progress_tracker.is_syncing(source.id);
|
|
let final_active_syncs = state.sync_progress_tracker.get_active_source_ids();
|
|
let final_progress = state.sync_progress_tracker.get_progress(source.id);
|
|
|
|
println!("📊 Final post-cancel state:");
|
|
println!(" 📊 DB Status: {:?}", final_db_source.status);
|
|
println!(" 📊 Is Syncing: {}", final_is_syncing);
|
|
println!(" 📊 Active Syncs: {:?}", final_active_syncs);
|
|
println!(" 📊 Progress: {:?}", final_progress.as_ref().map(|p| (p.is_active, &p.phase)));
|
|
|
|
// CRITICAL ASSERTIONS: These must all pass for proper cancellation
|
|
|
|
assert_eq!(final_db_source.status, SourceStatus::Idle,
|
|
"CRITICAL: Database status must be Idle after cancellation");
|
|
|
|
assert!(!final_is_syncing,
|
|
"CRITICAL: Progress tracker must not show source as syncing");
|
|
|
|
assert!(!final_active_syncs.contains(&source.id),
|
|
"CRITICAL: Source must not be in active syncs list");
|
|
|
|
if let Some(progress) = final_progress {
|
|
assert!(!progress.is_active,
|
|
"CRITICAL: Any remaining progress must show as inactive");
|
|
}
|
|
|
|
println!("✅ PHASE 5 PASSED: Complete cancellation cleanup verified");
|
|
|
|
// PHASE 6: Validate restart capability after cancellation
|
|
println!("📝 PHASE 6: Post-cancellation restart validation");
|
|
|
|
sleep(Duration::from_millis(500)).await; // Ensure complete cleanup
|
|
|
|
let restart_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("📡 Restart response: {}", restart_response.status());
|
|
|
|
// The restart should succeed (or fail with expected reasons, not due to lingering state)
|
|
let acceptable_restart_statuses = [StatusCode::OK, StatusCode::CONFLICT,
|
|
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::NOT_IMPLEMENTED];
|
|
assert!(acceptable_restart_statuses.contains(&restart_response.status()),
|
|
"Restart should succeed or fail with expected status, got: {}", restart_response.status());
|
|
|
|
// Clean up the restarted sync
|
|
sleep(Duration::from_millis(200)).await;
|
|
let final_cleanup_response = app
|
|
.clone()
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(format!("/api/sources/{}/sync/stop", source.id))
|
|
.header("Authorization", &auth_header)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
println!("📡 Final cleanup response: {}", final_cleanup_response.status());
|
|
|
|
println!("✅ PHASE 6 PASSED: Restart capability validated");
|
|
|
|
// Final cleanup
|
|
sleep(Duration::from_millis(500)).await;
|
|
state.db.delete_source(user.id, source.id).await.unwrap();
|
|
state.db.delete_user(user.id).await.unwrap();
|
|
|
|
println!("🎉 COMPREHENSIVE TEST PASSED: Complete sync cancellation validation successful");
|
|
println!(" ✅ All 6 phases validated successfully");
|
|
println!(" ✅ Sync actually stops working (not just status changes)");
|
|
println!(" ✅ Resources properly cleaned up");
|
|
println!(" ✅ System remains in consistent state");
|
|
} |