feat(webdav): gracefully recover webdav from stops/crashes

This commit is contained in:
perf3ct
2025-07-03 04:45:25 +00:00
parent 2297eb8261
commit 69c40c10fa
3 changed files with 377 additions and 26 deletions

View File

@@ -0,0 +1,17 @@
-- Add scan progress tracking for crash recovery
-- This allows resuming interrupted scans after server restarts
ALTER TABLE webdav_directories
ADD COLUMN IF NOT EXISTS scan_in_progress BOOLEAN DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS scan_started_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS scan_error TEXT;
-- Create index for finding incomplete scans
CREATE INDEX IF NOT EXISTS idx_webdav_directories_scan_progress
ON webdav_directories(user_id, scan_in_progress)
WHERE scan_in_progress = TRUE;
-- Create index for finding scans that have been running too long (possible crashes)
CREATE INDEX IF NOT EXISTS idx_webdav_directories_stale_scans
ON webdav_directories(scan_started_at)
WHERE scan_in_progress = TRUE;

View File

@@ -351,4 +351,82 @@ impl Database {
Ok(result.rows_affected() as i64)
}
/// Find directories with incomplete scans that need recovery
pub async fn get_incomplete_webdav_scans(&self, user_id: Uuid) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"SELECT directory_path FROM webdav_directories
WHERE user_id = $1 AND scan_in_progress = TRUE
ORDER BY scan_started_at ASC"#
)
.bind(user_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|row| row.get("directory_path")).collect())
}
/// Find scans that have been running too long (possible crashes)
pub async fn get_stale_webdav_scans(&self, user_id: Uuid, timeout_minutes: i64) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"SELECT directory_path FROM webdav_directories
WHERE user_id = $1 AND scan_in_progress = TRUE
AND scan_started_at < NOW() - INTERVAL '1 minute' * $2
ORDER BY scan_started_at ASC"#
)
.bind(user_id)
.bind(timeout_minutes)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|row| row.get("directory_path")).collect())
}
/// Mark a directory scan as in progress
pub async fn mark_webdav_scan_in_progress(&self, user_id: Uuid, directory_path: &str) -> Result<()> {
sqlx::query(
r#"INSERT INTO webdav_directories (user_id, directory_path, directory_etag, scan_in_progress, scan_started_at, last_scanned_at)
VALUES ($1, $2, '', TRUE, NOW(), NOW())
ON CONFLICT (user_id, directory_path)
DO UPDATE SET scan_in_progress = TRUE, scan_started_at = NOW(), scan_error = NULL"#
)
.bind(user_id)
.bind(directory_path)
.execute(&self.pool)
.await?;
Ok(())
}
/// Mark a directory scan as complete
pub async fn mark_webdav_scan_complete(&self, user_id: Uuid, directory_path: &str) -> Result<()> {
sqlx::query(
r#"UPDATE webdav_directories
SET scan_in_progress = FALSE, scan_started_at = NULL, scan_error = NULL,
last_scanned_at = NOW(), updated_at = NOW()
WHERE user_id = $1 AND directory_path = $2"#
)
.bind(user_id)
.bind(directory_path)
.execute(&self.pool)
.await?;
Ok(())
}
/// Mark a directory scan as failed with error
pub async fn mark_webdav_scan_failed(&self, user_id: Uuid, directory_path: &str, error: &str) -> Result<()> {
sqlx::query(
r#"UPDATE webdav_directories
SET scan_in_progress = FALSE, scan_error = $3, updated_at = NOW()
WHERE user_id = $1 AND directory_path = $2"#
)
.bind(user_id)
.bind(directory_path)
.bind(error)
.execute(&self.pool)
.await?;
Ok(())
}
}

View File

@@ -498,8 +498,34 @@ impl WebDAVService {
/// Optimized discovery that checks directory ETag first to avoid unnecessary deep scans
pub async fn discover_files_in_folder_optimized(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
self.discover_files_in_folder_optimized_with_recovery(folder_path, user_id, state, true).await
}
async fn discover_files_in_folder_optimized_with_recovery(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState, enable_crash_recovery: bool) -> Result<Vec<FileInfo>> {
debug!("🔍 Starting optimized discovery for folder: {}", folder_path);
// Check for incomplete scans that need recovery
if enable_crash_recovery {
if let Ok(incomplete_scans) = self.detect_incomplete_scans(user_id, state).await {
if !incomplete_scans.is_empty() {
info!("🔄 Detected {} incomplete scans from previous session, resuming...", incomplete_scans.len());
for incomplete_path in incomplete_scans {
if incomplete_path.starts_with(folder_path) {
info!("🔄 Resuming incomplete scan for: {}", incomplete_path);
match self.resume_deep_scan_internal(&incomplete_path, user_id, state).await {
Ok(resumed_files) => {
info!("✅ Successfully resumed scan for {}: {} files found", incomplete_path, resumed_files.len());
}
Err(e) => {
warn!("⚠️ Failed to resume scan for {}: {}", incomplete_path, e);
}
}
}
}
}
}
}
// Check if we should use smart scanning
let use_smart_scan = match self.config.server_type.as_deref() {
Some("nextcloud") | Some("owncloud") => {
@@ -523,8 +549,8 @@ impl WebDAVService {
}
};
// Use smart scanning with depth-1 traversal
return self.smart_directory_scan(folder_path, stored_etag.as_deref(), user_id, state).await;
// Use smart scanning with depth-1 traversal and checkpoint recovery
return self.smart_directory_scan_with_checkpoints(folder_path, stored_etag.as_deref(), user_id, state).await;
}
// Fall back to traditional optimization for other servers
@@ -730,8 +756,11 @@ impl WebDAVService {
for path in paths_to_scan {
debug!("🔍 Targeted scan of: {}", path);
// Convert to relative path for API calls
let relative_path = self.convert_to_relative_path(path);
// Check if this specific path has changed
match self.check_directory_etag(path).await {
match self.check_directory_etag(&relative_path).await {
Ok(current_etag) => {
// Check cached ETag
let needs_scan = match state.db.get_webdav_directory(user_id, path).await {
@@ -756,7 +785,7 @@ impl WebDAVService {
if needs_scan {
// Use shallow scan for this specific directory only
match self.discover_files_in_folder_shallow(path).await {
match self.discover_files_in_folder_shallow(&relative_path).await {
Ok(mut path_files) => {
debug!("📂 Found {} files in changed path {}", path_files.len(), path);
all_files.append(&mut path_files);
@@ -1244,8 +1273,11 @@ impl WebDAVService {
// Find a directory with subdirectories from our watch folders
for watch_folder in &self.config.watch_folders {
// Convert to relative path for API calls
let relative_watch_folder = self.convert_to_relative_path(watch_folder);
// Get the directory structure with depth 1
match self.discover_files_in_folder_shallow(watch_folder).await {
match self.discover_files_in_folder_shallow(&relative_watch_folder).await {
Ok(entries) => {
// Find a subdirectory to test with
let subdirs: Vec<_> = entries.iter()
@@ -1261,10 +1293,11 @@ impl WebDAVService {
debug!("Testing with directory: {} and subdirectory: {}", watch_folder, test_subdir.path);
// Step 1: Get parent directory ETag
let parent_etag = self.check_directory_etag(watch_folder).await?;
let parent_etag = self.check_directory_etag(&relative_watch_folder).await?;
// Step 2: Get subdirectory ETag
let subdir_etag = self.check_directory_etag(&test_subdir.path).await?;
// Step 2: Get subdirectory ETag (convert to relative path)
let relative_subdir_path = self.convert_to_relative_path(&test_subdir.path);
let subdir_etag = self.check_directory_etag(&relative_subdir_path).await?;
// Step 3: Check if parent has a different ETag than child
// In a recursive ETag system, they should be different but related
@@ -1385,7 +1418,72 @@ impl WebDAVService {
all_files.push(entry);
}
// Update tracking for this directory
// Note: We'll update the directory tracking at the end after processing all subdirectories
// to avoid ETag race conditions during the scan
// Step 4: Process subdirectories concurrently with controlled parallelism
if !subdirs_to_scan.is_empty() {
let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans));
let subdirs_stream = stream::iter(subdirs_to_scan)
.map(|subdir| {
let semaphore = semaphore.clone();
let service = self.clone();
async move {
let _permit = semaphore.acquire().await.map_err(|e| anyhow!("Semaphore error: {}", e))?;
// Get stored ETag for this subdirectory
let stored_etag = match state.db.get_webdav_directory(user_id, &subdir.path).await {
Ok(Some(dir)) => Some(dir.directory_etag),
Ok(None) => {
debug!("🆕 New subdirectory discovered: {}", subdir.path);
None
}
Err(e) => {
warn!("Database error checking subdirectory {}: {}", subdir.path, e);
None
}
};
// If ETag changed or new directory, scan it recursively
if stored_etag.as_deref() != Some(&subdir.etag) {
debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})",
subdir.path, stored_etag, subdir.etag);
match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await {
Ok(subdir_files) => {
debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path);
Result::<Vec<FileInfo>, anyhow::Error>::Ok(subdir_files)
}
Err(e) => {
error!("Failed to scan subdirectory {}: {}", subdir.path, e);
Result::<Vec<FileInfo>, anyhow::Error>::Ok(Vec::new()) // Continue with other subdirectories
}
}
} else {
debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag);
// Don't update database during scan - will be handled by top-level caller
Result::<Vec<FileInfo>, anyhow::Error>::Ok(Vec::new())
}
}
})
.buffer_unordered(self.concurrency_config.max_concurrent_scans);
// Collect all results concurrently
let mut subdirs_stream = std::pin::pin!(subdirs_stream);
while let Some(result) = subdirs_stream.next().await {
match result {
Ok(mut subdir_files) => {
all_files.append(&mut subdir_files);
}
Err(e) => {
warn!("Concurrent subdirectory scan error: {}", e);
// Continue processing other subdirectories
}
}
}
}
// Only update database if this is the top-level call (not a recursive subdirectory scan)
let file_count = all_files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64;
let total_size = all_files.iter()
.filter(|f| !f.is_directory && self.is_direct_child(&f.path, path))
@@ -1404,6 +1502,75 @@ impl WebDAVService {
warn!("Failed to update directory tracking for {}: {}", path, e);
}
debug!("🧠 Smart scan completed for {}: {} total entries found", path, all_files.len());
Ok(all_files)
})
}
/// Internal version of smart_directory_scan that doesn't update the database
/// Used for recursive subdirectory scanning to avoid race conditions
fn smart_directory_scan_internal<'a>(
&'a self,
path: &'a str,
known_etag: Option<&'a str>,
user_id: uuid::Uuid,
state: &'a crate::AppState
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<FileInfo>>> + Send + 'a>> {
Box::pin(async move {
debug!("🧠 Smart scan (internal) starting for path: {}", path);
// Convert full WebDAV path to relative path for existing functions
let relative_path = self.convert_to_relative_path(path);
debug!("🔄 Converted {} to relative path: {}", path, relative_path);
// Step 1: Check current directory ETag
let current_etag = match self.check_directory_etag(&relative_path).await {
Ok(etag) => etag,
Err(e) => {
warn!("Failed to get directory ETag for {}, falling back to full scan: {}", path, e);
return self.discover_files_in_folder_impl(&relative_path).await;
}
};
// Step 2: If unchanged and we support recursive ETags, nothing to do
if known_etag == Some(&current_etag) {
let supports_recursive = match self.config.server_type.as_deref() {
Some("nextcloud") | Some("owncloud") => true,
_ => false
};
if supports_recursive {
debug!("✅ Directory {} unchanged (recursive ETag: {}), skipping scan", path, current_etag);
return Ok(Vec::new());
} else {
debug!("📁 Directory {} ETag unchanged but server doesn't support recursive ETags, checking subdirectories", path);
}
} else {
debug!("🔄 Directory {} changed (old: {:?}, new: {})", path, known_etag, current_etag);
}
// Step 3: Directory changed or we need to check subdirectories - do depth-1 scan
let entries = match self.discover_files_in_folder_shallow(&relative_path).await {
Ok(files) => files,
Err(e) => {
error!("Failed shallow scan of {}: {}", path, e);
return Err(e);
}
};
let mut all_files = Vec::new();
let mut subdirs_to_scan = Vec::new();
// Separate files and directories
for entry in entries {
if entry.is_directory && entry.path != path {
subdirs_to_scan.push(entry.clone());
}
all_files.push(entry);
}
// Note: No database update in internal function to avoid race conditions
// Step 4: Process subdirectories concurrently with controlled parallelism
if !subdirs_to_scan.is_empty() {
let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans));
@@ -1432,7 +1599,7 @@ impl WebDAVService {
debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})",
subdir.path, stored_etag, subdir.etag);
match service.smart_directory_scan(&subdir.path, stored_etag.as_deref(), user_id, state).await {
match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await {
Ok(subdir_files) => {
debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path);
Result::<Vec<FileInfo>, anyhow::Error>::Ok(subdir_files)
@@ -1444,17 +1611,7 @@ impl WebDAVService {
}
} else {
debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag);
// Update last_scanned_at
let update = crate::models::UpdateWebDAVDirectory {
directory_etag: subdir.etag.clone(),
last_scanned_at: chrono::Utc::now(),
file_count: 0, // Will be preserved by database
total_size_bytes: 0,
};
if let Err(e) = state.db.update_webdav_directory(user_id, &subdir.path, &update).await {
warn!("Failed to update scan time for {}: {}", subdir.path, e);
}
// Don't update database during internal scan
Result::<Vec<FileInfo>, anyhow::Error>::Ok(Vec::new())
}
}
@@ -1476,17 +1633,116 @@ impl WebDAVService {
}
}
debug!("🧠 Smart scan completed for {}: {} total entries found", path, all_files.len());
debug!("🧠 Smart scan (internal) completed for {}: {} total entries found", path, all_files.len());
Ok(all_files)
})
}
/// Smart directory scan with checkpoint-based crash recovery
pub fn smart_directory_scan_with_checkpoints<'a>(
&'a self,
path: &'a str,
known_etag: Option<&'a str>,
user_id: uuid::Uuid,
state: &'a crate::AppState
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<FileInfo>>> + Send + 'a>> {
Box::pin(async move {
debug!("🧠 Smart scan with checkpoints starting for path: {}", path);
// Mark scan as in progress (checkpoint)
if let Err(e) = self.mark_scan_in_progress(user_id, path, state).await {
warn!("Failed to mark scan in progress for {}: {}", path, e);
}
// Perform the actual scan
let result = self.smart_directory_scan_internal(path, known_etag, user_id, state).await;
match &result {
Ok(files) => {
debug!("✅ Smart scan completed for {}: {} files", path, files.len());
// Update directory tracking and mark scan complete
let file_count = files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64;
let total_size = files.iter()
.filter(|f| !f.is_directory && self.is_direct_child(&f.path, path))
.map(|f| f.size)
.sum::<i64>();
let current_etag = known_etag.unwrap_or("unknown").to_string();
let dir_record = crate::models::CreateWebDAVDirectory {
user_id,
directory_path: path.to_string(),
directory_etag: current_etag.clone(),
file_count,
total_size_bytes: total_size,
};
if let Err(e) = state.db.create_or_update_webdav_directory(&dir_record).await {
warn!("Failed to update directory tracking for {}: {}", path, e);
}
// Mark scan as complete (remove checkpoint)
if let Err(e) = self.mark_scan_complete(user_id, path, state).await {
warn!("Failed to mark scan complete for {}: {}", path, e);
}
}
Err(e) => {
error!("❌ Smart scan failed for {}: {}", path, e);
// Mark scan as failed for better tracking
if let Err(mark_err) = state.db.mark_webdav_scan_failed(user_id, path, &e.to_string()).await {
warn!("Failed to mark scan as failed for {}: {}", path, mark_err);
}
}
}
result
})
}
/// Detect directories with incomplete scans that need recovery
async fn detect_incomplete_scans(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<String>> {
debug!("🔍 Checking for incomplete scans...");
// Check for both incomplete scans and stale scans (running too long, likely crashed)
let mut incomplete_scans = state.db.get_incomplete_webdav_scans(user_id).await.unwrap_or_default();
let stale_scans = state.db.get_stale_webdav_scans(user_id, 30).await.unwrap_or_default(); // 30 minute timeout
// Combine and deduplicate
incomplete_scans.extend(stale_scans);
incomplete_scans.sort();
incomplete_scans.dedup();
if !incomplete_scans.is_empty() {
info!("Found {} incomplete/stale scans to recover", incomplete_scans.len());
}
Ok(incomplete_scans)
}
/// Mark a directory scan as in progress (for crash recovery)
async fn mark_scan_in_progress(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> {
debug!("📝 Marking scan in progress for: {}", path);
state.db.mark_webdav_scan_in_progress(user_id, path).await
}
/// Mark a directory scan as complete (remove crash recovery checkpoint)
async fn mark_scan_complete(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> {
debug!("✅ Marking scan complete for: {}", path);
state.db.mark_webdav_scan_complete(user_id, path).await
}
/// Resume a deep scan from a checkpoint after server restart/interruption
pub async fn resume_deep_scan(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
self.resume_deep_scan_internal(checkpoint_path, user_id, state).await
}
/// Internal resume function that doesn't trigger crash recovery detection (to avoid recursion)
async fn resume_deep_scan_internal(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
info!("🔄 Resuming deep scan from checkpoint: {}", checkpoint_path);
// Check if the checkpoint directory is still accessible
match self.check_directory_etag(checkpoint_path).await {
let relative_checkpoint_path = self.convert_to_relative_path(checkpoint_path);
match self.check_directory_etag(&relative_checkpoint_path).await {
Ok(current_etag) => {
info!("✅ Checkpoint directory accessible, resuming scan");
@@ -1509,17 +1765,17 @@ impl WebDAVService {
}
// Resume with smart scanning from this point
self.discover_files_in_folder_optimized(checkpoint_path, user_id, state).await
self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await
}
Err(e) => {
warn!("Checkpoint directory {} inaccessible after restart: {}", checkpoint_path, e);
// Server might have restarted, wait a bit and retry
tokio::time::sleep(Duration::from_secs(5)).await;
match self.check_directory_etag(checkpoint_path).await {
match self.check_directory_etag(&relative_checkpoint_path).await {
Ok(_) => {
info!("🔄 Server recovered, resuming scan");
self.discover_files_in_folder_optimized(checkpoint_path, user_id, state).await
self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await
}
Err(e2) => {
error!("Failed to resume deep scan after server restart: {}", e2);