diff --git a/migrations/20250709000001_create_new_queue_stats_function.sql b/migrations/20250709000001_create_new_queue_stats_function.sql new file mode 100644 index 0000000..0325c93 --- /dev/null +++ b/migrations/20250709000001_create_new_queue_stats_function.sql @@ -0,0 +1,29 @@ +-- Create a new queue statistics function with a different name to avoid conflicts +-- This completely avoids the old get_ocr_queue_stats function issues + +CREATE FUNCTION get_queue_statistics() +RETURNS TABLE ( + pending_count BIGINT, + processing_count BIGINT, + failed_count BIGINT, + completed_today BIGINT, + avg_wait_time_minutes DOUBLE PRECISION, + oldest_pending_minutes DOUBLE PRECISION +) AS $$ +BEGIN + RETURN QUERY + SELECT + COUNT(*) FILTER (WHERE status = 'pending') as pending_count, + COUNT(*) FILTER (WHERE status = 'processing') as processing_count, + COUNT(*) FILTER (WHERE status = 'failed' AND attempts >= max_attempts) as failed_count, + -- Get completed_today from documents table instead of ocr_queue + (SELECT COUNT(*)::BIGINT + FROM documents + WHERE ocr_status = 'completed' + AND updated_at >= CURRENT_DATE + AND updated_at < CURRENT_DATE + INTERVAL '1 day') as completed_today, + CAST(AVG(EXTRACT(EPOCH FROM (COALESCE(started_at, NOW()) - created_at))/60) FILTER (WHERE status IN ('processing', 'completed')) AS DOUBLE PRECISION) as avg_wait_time_minutes, + CAST(MAX(EXTRACT(EPOCH FROM (NOW() - created_at))/60) FILTER (WHERE status = 'pending') AS DOUBLE PRECISION) as oldest_pending_minutes + FROM ocr_queue; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/src/db/mod.rs b/src/db/mod.rs index 89e0d79..8e1882b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -308,34 +308,8 @@ impl Database { .execute(&self.pool) .await?; - // Create the statistics function - sqlx::query( - r#" - CREATE OR REPLACE FUNCTION get_ocr_queue_stats() - RETURNS TABLE ( - pending_count BIGINT, - processing_count BIGINT, - failed_count BIGINT, - completed_today BIGINT, - avg_wait_time_minutes DOUBLE PRECISION, - oldest_pending_minutes DOUBLE PRECISION - ) AS $$ - BEGIN - RETURN QUERY - SELECT - COUNT(*) FILTER (WHERE status = 'pending') as pending_count, - COUNT(*) FILTER (WHERE status = 'processing') as processing_count, - COUNT(*) FILTER (WHERE status = 'failed' AND attempts >= max_attempts) as failed_count, - COUNT(*) FILTER (WHERE status = 'completed' AND completed_at >= CURRENT_DATE) as completed_today, - AVG(EXTRACT(EPOCH FROM (COALESCE(started_at, NOW()) - created_at))/60) FILTER (WHERE status IN ('processing', 'completed')) as avg_wait_time_minutes, - MAX(EXTRACT(EPOCH FROM (NOW() - created_at))/60) FILTER (WHERE status = 'pending') as oldest_pending_minutes - FROM ocr_queue; - END; - $$ LANGUAGE plpgsql - "# - ) - .execute(&self.pool) - .await?; + // NOTE: get_ocr_queue_stats() function is now managed by SQLx migrations + // See migrations/20250708000001_simplify_ocr_queue_stats.sql for current implementation Ok(()) } diff --git a/src/main.rs b/src/main.rs index 2fb95eb..dd4a143 100644 --- a/src/main.rs +++ b/src/main.rs @@ -254,13 +254,13 @@ async fn main() -> anyhow::Result<()> { info!("๐Ÿ“Š Latest migration now: {}", latest); } - // Verify the get_ocr_queue_stats function has the correct implementation + // Verify the get_queue_statistics function has the correct implementation let function_check = sqlx::query_scalar::<_, Option>( r#" SELECT pg_get_functiondef(p.oid) FROM pg_proc p JOIN pg_namespace n ON p.pronamespace = n.oid - WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats' + WHERE n.nspname = 'public' AND p.proname = 'get_queue_statistics' "# ) .fetch_one(web_db.get_pool()) @@ -268,7 +268,7 @@ async fn main() -> anyhow::Result<()> { match function_check { Ok(Some(def)) => { - info!("๐Ÿ“‹ get_ocr_queue_stats function definition retrieved"); + info!("๐Ÿ“‹ get_queue_statistics function definition retrieved"); // Debug: print the actual function definition info!("๐Ÿ” Function definition (first 500 chars): {}", @@ -276,27 +276,21 @@ async fn main() -> anyhow::Result<()> { // Check if it contains the correct logic from our latest migration let has_documents_subquery = def.contains("FROM documents") && def.contains("ocr_status = 'completed'"); - let has_old_cte_logic = def.contains("document_stats") || def.contains("queue_stats"); - let has_old_completed_logic = def.contains("completed_at >= CURRENT_DATE"); + let has_cast_statements = def.contains("CAST("); info!("๐Ÿ” Function content analysis:"); info!(" Has documents subquery: {}", has_documents_subquery); - info!(" Has old CTE logic: {}", has_old_cte_logic); - info!(" Has old completed_at logic: {}", has_old_completed_logic); + info!(" Has CAST statements: {}", has_cast_statements); - if has_documents_subquery && !has_old_completed_logic { - info!("โœ… get_ocr_queue_stats function has correct NEW logic (uses documents table subquery)"); - } else if has_old_cte_logic { - error!("โŒ get_ocr_queue_stats function still uses CTE logic - should use simple subquery"); - } else if has_old_completed_logic { - error!("โŒ get_ocr_queue_stats function still uses old completed_at logic from ocr_queue table"); + if has_documents_subquery && has_cast_statements { + info!("โœ… get_queue_statistics function has correct logic (uses documents table subquery with CAST)"); } else { - error!("โŒ get_ocr_queue_stats function has unexpected structure"); + error!("โŒ get_queue_statistics function has unexpected structure"); } // Test the function execution at startup info!("๐Ÿงช Testing function execution at startup..."); - match sqlx::query("SELECT * FROM get_ocr_queue_stats()").fetch_one(web_db.get_pool()).await { + match sqlx::query("SELECT * FROM get_queue_statistics()").fetch_one(web_db.get_pool()).await { Ok(test_result) => { info!("โœ… Function executes successfully at startup"); let columns = test_result.columns(); @@ -310,8 +304,8 @@ async fn main() -> anyhow::Result<()> { } } } - Ok(None) => error!("โŒ get_ocr_queue_stats function does not exist after migration"), - Err(e) => error!("โŒ Failed to verify get_ocr_queue_stats function: {}", e), + Ok(None) => error!("โŒ get_queue_statistics function does not exist after migration"), + Err(e) => error!("โŒ Failed to verify get_queue_statistics function: {}", e), } } Err(e) => { diff --git a/src/ocr/queue.rs b/src/ocr/queue.rs index bc8929c..f3a4ac1 100644 --- a/src/ocr/queue.rs +++ b/src/ocr/queue.rs @@ -804,7 +804,7 @@ impl OcrQueueService { pg_get_function_arguments(p.oid) as arguments FROM pg_proc p JOIN pg_namespace n ON p.pronamespace = n.oid - WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats' + WHERE n.nspname = 'public' AND p.proname = 'get_queue_statistics' "# ) .fetch_optional(&self.pool) @@ -820,15 +820,15 @@ impl OcrQueueService { let arguments: String = info.get("arguments"); tracing::debug!("Function info - name: {}, return_type: {}, arguments: {}", function_name, return_type, arguments); } else { - tracing::error!("get_ocr_queue_stats function not found!"); - return Err(anyhow::anyhow!("get_ocr_queue_stats function not found")); + tracing::error!("get_queue_statistics function not found!"); + return Err(anyhow::anyhow!("get_queue_statistics function not found")); } - tracing::debug!("OCR Queue: Calling get_ocr_queue_stats() function"); + tracing::debug!("OCR Queue: Calling get_queue_statistics() function"); let stats = sqlx::query( r#" - SELECT * FROM get_ocr_queue_stats() + SELECT * FROM get_queue_statistics() "# ) .fetch_one(&self.pool)