Files
readur/tests/integration_thread_separation_tests.rs

774 lines
22 KiB
Rust

/*!
* Thread Separation and Performance Unit Tests
*
* Tests for thread separation and performance optimization including:
* - Dedicated runtime separation (OCR, Background, DB)
* - Thread pool isolation and resource allocation
* - Performance monitoring and metrics
* - Memory usage and CPU utilization
* - Contention prevention between sync and OCR
* - Database connection pool separation
*/
use std::sync::{Arc, Mutex, atomic::{AtomicU64, AtomicU32, Ordering}};
use std::time::{Duration, Instant, SystemTime};
use std::thread;
use uuid::Uuid;
use tokio::runtime::{Builder, Runtime};
use tokio::time::{sleep, timeout};
/// Test runtime configuration and separation
#[test]
fn test_runtime_configuration() {
// Test OCR runtime configuration
let ocr_runtime = Builder::new_multi_thread()
.worker_threads(3)
.thread_name("readur-ocr")
.enable_all()
.build();
assert!(ocr_runtime.is_ok(), "OCR runtime should be created successfully");
// Test background runtime configuration
let background_runtime = Builder::new_multi_thread()
.worker_threads(2)
.thread_name("readur-background")
.enable_all()
.build();
assert!(background_runtime.is_ok(), "Background runtime should be created successfully");
// Test DB runtime configuration
let db_runtime = Builder::new_multi_thread()
.worker_threads(2)
.thread_name("readur-db")
.enable_all()
.build();
assert!(db_runtime.is_ok(), "DB runtime should be created successfully");
}
#[test]
fn test_thread_pool_isolation() {
// Test that different thread pools don't interfere with each other
let ocr_counter = Arc::new(AtomicU32::new(0));
let background_counter = Arc::new(AtomicU32::new(0));
let db_counter = Arc::new(AtomicU32::new(0));
// Create separate runtimes
let ocr_rt = Builder::new_multi_thread()
.worker_threads(2) // Reduced thread count
.thread_name("test-ocr")
.enable_time() // Enable timers
.build()
.unwrap();
let bg_rt = Builder::new_multi_thread()
.worker_threads(2)
.thread_name("test-bg")
.enable_time() // Enable timers
.build()
.unwrap();
let db_rt = Builder::new_multi_thread()
.worker_threads(2)
.thread_name("test-db")
.enable_time() // Enable timers
.build()
.unwrap();
// Use scoped threads to avoid deadlocks
std::thread::scope(|s| {
let ocr_counter_clone = Arc::clone(&ocr_counter);
let ocr_handle = s.spawn(move || {
ocr_rt.block_on(async {
for _ in 0..50 { // Reduced iterations
ocr_counter_clone.fetch_add(1, Ordering::Relaxed);
sleep(Duration::from_millis(1)).await;
}
});
});
let bg_counter_clone = Arc::clone(&background_counter);
let bg_handle = s.spawn(move || {
bg_rt.block_on(async {
for _ in 0..50 { // Reduced iterations
bg_counter_clone.fetch_add(1, Ordering::Relaxed);
sleep(Duration::from_millis(1)).await;
}
});
});
let db_counter_clone = Arc::clone(&db_counter);
let db_handle = s.spawn(move || {
db_rt.block_on(async {
for _ in 0..50 { // Reduced iterations
db_counter_clone.fetch_add(1, Ordering::Relaxed);
sleep(Duration::from_millis(1)).await;
}
});
});
// Wait for all threads to complete
ocr_handle.join().unwrap();
bg_handle.join().unwrap();
db_handle.join().unwrap();
});
// Verify all work completed
assert_eq!(ocr_counter.load(Ordering::Relaxed), 50);
assert_eq!(background_counter.load(Ordering::Relaxed), 50);
assert_eq!(db_counter.load(Ordering::Relaxed), 50);
}
#[tokio::test]
async fn test_database_connection_pool_separation() {
// Test separate connection pools for different workloads
let web_pool = DatabaseConnectionPool::new("web", 20, 2);
let background_pool = DatabaseConnectionPool::new("background", 30, 3);
assert_eq!(web_pool.max_connections, 20);
assert_eq!(web_pool.min_connections, 2);
assert_eq!(web_pool.pool_name, "web");
assert_eq!(background_pool.max_connections, 30);
assert_eq!(background_pool.min_connections, 3);
assert_eq!(background_pool.pool_name, "background");
// Test connection acquisition
let web_conn = web_pool.acquire_connection().await;
assert!(web_conn.is_ok(), "Should acquire web connection");
let bg_conn = background_pool.acquire_connection().await;
assert!(bg_conn.is_ok(), "Should acquire background connection");
// Test that pools are isolated
assert_ne!(web_pool.pool_id, background_pool.pool_id);
}
#[derive(Debug, Clone)]
struct DatabaseConnectionPool {
pool_name: String,
max_connections: u32,
min_connections: u32,
pool_id: Uuid,
active_connections: Arc<AtomicU32>,
}
impl DatabaseConnectionPool {
fn new(name: &str, max_conn: u32, min_conn: u32) -> Self {
Self {
pool_name: name.to_string(),
max_connections: max_conn,
min_connections: min_conn,
pool_id: Uuid::new_v4(),
active_connections: Arc::new(AtomicU32::new(0)),
}
}
async fn acquire_connection(&self) -> Result<DatabaseConnection, String> {
let current = self.active_connections.load(Ordering::Relaxed);
if current >= self.max_connections {
return Err("Max connections reached".to_string());
}
self.active_connections.fetch_add(1, Ordering::Relaxed);
Ok(DatabaseConnection {
id: Uuid::new_v4(),
pool_name: self.pool_name.clone(),
})
}
}
#[derive(Debug)]
struct DatabaseConnection {
id: Uuid,
pool_name: String,
}
#[test]
fn test_performance_metrics_collection() {
let metrics = PerformanceMetrics::new();
// Test OCR metrics
metrics.record_ocr_operation(Duration::from_millis(150), true);
metrics.record_ocr_operation(Duration::from_millis(200), true);
metrics.record_ocr_operation(Duration::from_millis(100), false); // Failed
let ocr_stats = metrics.get_ocr_stats();
assert_eq!(ocr_stats.total_operations, 3);
assert_eq!(ocr_stats.successful_operations, 2);
assert_eq!(ocr_stats.failed_operations, 1);
let avg_duration = ocr_stats.average_duration();
assert!(avg_duration > Duration::from_millis(100));
assert!(avg_duration < Duration::from_millis(200));
// Test sync metrics
metrics.record_sync_operation(Duration::from_secs(30), 50, 45);
metrics.record_sync_operation(Duration::from_secs(45), 100, 95);
let sync_stats = metrics.get_sync_stats();
assert_eq!(sync_stats.total_operations, 2);
assert_eq!(sync_stats.total_files_processed, 150);
assert_eq!(sync_stats.total_files_successful, 140);
}
struct PerformanceMetrics {
ocr_operations: Arc<Mutex<Vec<OcrOperation>>>,
sync_operations: Arc<Mutex<Vec<SyncOperation>>>,
memory_samples: Arc<Mutex<Vec<MemorySample>>>,
}
impl PerformanceMetrics {
fn new() -> Self {
Self {
ocr_operations: Arc::new(Mutex::new(Vec::new())),
sync_operations: Arc::new(Mutex::new(Vec::new())),
memory_samples: Arc::new(Mutex::new(Vec::new())),
}
}
fn record_ocr_operation(&self, duration: Duration, success: bool) {
let mut ops = self.ocr_operations.lock().unwrap();
ops.push(OcrOperation {
duration,
success,
timestamp: SystemTime::now(),
});
}
fn record_sync_operation(&self, duration: Duration, files_found: u32, files_processed: u32) {
let mut ops = self.sync_operations.lock().unwrap();
ops.push(SyncOperation {
duration,
files_found,
files_processed,
timestamp: SystemTime::now(),
});
}
fn get_ocr_stats(&self) -> OcrStats {
let ops = self.ocr_operations.lock().unwrap();
let total = ops.len() as u32;
let successful = ops.iter().filter(|op| op.success).count() as u32;
let failed = total - successful;
let total_duration: Duration = ops.iter().map(|op| op.duration).sum();
OcrStats {
total_operations: total,
successful_operations: successful,
failed_operations: failed,
total_duration,
}
}
fn get_sync_stats(&self) -> SyncStats {
let ops = self.sync_operations.lock().unwrap();
let total = ops.len() as u32;
let total_files = ops.iter().map(|op| op.files_found).sum();
let successful_files = ops.iter().map(|op| op.files_processed).sum();
SyncStats {
total_operations: total,
total_files_processed: total_files,
total_files_successful: successful_files,
}
}
}
#[derive(Debug, Clone)]
struct OcrOperation {
duration: Duration,
success: bool,
timestamp: SystemTime,
}
#[derive(Debug, Clone)]
struct SyncOperation {
duration: Duration,
files_found: u32,
files_processed: u32,
timestamp: SystemTime,
}
#[derive(Debug, Clone)]
struct MemorySample {
heap_usage_mb: f64,
stack_usage_mb: f64,
timestamp: SystemTime,
}
#[derive(Debug)]
struct OcrStats {
total_operations: u32,
successful_operations: u32,
failed_operations: u32,
total_duration: Duration,
}
impl OcrStats {
fn success_rate(&self) -> f64 {
if self.total_operations == 0 {
0.0
} else {
self.successful_operations as f64 / self.total_operations as f64
}
}
fn average_duration(&self) -> Duration {
if self.total_operations == 0 {
Duration::ZERO
} else {
self.total_duration / self.total_operations
}
}
}
#[derive(Debug)]
struct SyncStats {
total_operations: u32,
total_files_processed: u32,
total_files_successful: u32,
}
#[test]
fn test_memory_usage_monitoring() {
let memory_monitor = MemoryMonitor::new();
// Simulate memory allocation
let mut allocations = Vec::new();
for i in 0..100 {
let allocation = vec![0u8; 1024 * 1024]; // 1MB allocation
allocations.push(allocation);
if i % 10 == 0 {
memory_monitor.record_sample();
}
}
let stats = memory_monitor.get_stats();
assert!(stats.samples.len() >= 10, "Should have memory samples");
// Check that memory usage increased
let first_sample = &stats.samples[0];
let last_sample = &stats.samples[stats.samples.len() - 1];
assert!(last_sample.heap_usage_mb >= first_sample.heap_usage_mb,
"Memory usage should increase with allocations");
}
struct MemoryMonitor {
samples: Arc<Mutex<Vec<MemoryUsage>>>,
}
impl MemoryMonitor {
fn new() -> Self {
Self {
samples: Arc::new(Mutex::new(Vec::new())),
}
}
fn record_sample(&self) {
let usage = self.get_current_memory_usage();
let mut samples = self.samples.lock().unwrap();
samples.push(usage);
}
fn get_current_memory_usage(&self) -> MemoryUsage {
// In a real implementation, this would use system APIs to get actual memory usage
// For testing, we simulate increasing usage
let samples_count = self.samples.lock().unwrap().len();
MemoryUsage {
heap_usage_mb: 50.0 + (samples_count as f64 * 5.0), // Simulated growth
rss_mb: 100.0 + (samples_count as f64 * 10.0),
timestamp: SystemTime::now(),
}
}
fn get_stats(&self) -> MemoryStats {
let samples = self.samples.lock().unwrap().clone();
MemoryStats {
samples,
}
}
}
#[derive(Debug, Clone)]
struct MemoryUsage {
heap_usage_mb: f64,
rss_mb: f64,
timestamp: SystemTime,
}
#[derive(Debug)]
struct MemoryStats {
samples: Vec<MemoryUsage>,
}
#[test]
fn test_cpu_utilization_monitoring() {
let cpu_monitor = CpuMonitor::new();
// Simulate CPU-intensive work
let start = Instant::now();
let mut counter = 0u64;
while start.elapsed() < Duration::from_millis(100) {
counter += 1;
if counter % 10000 == 0 {
cpu_monitor.record_sample();
}
}
let stats = cpu_monitor.get_stats();
assert!(!stats.samples.is_empty(), "Should have CPU samples");
// Verify that CPU usage was recorded
let avg_usage = stats.average_usage();
assert!(avg_usage >= 0.0 && avg_usage <= 100.0, "CPU usage should be between 0-100%");
}
struct CpuMonitor {
samples: Arc<Mutex<Vec<CpuUsage>>>,
}
impl CpuMonitor {
fn new() -> Self {
Self {
samples: Arc::new(Mutex::new(Vec::new())),
}
}
fn record_sample(&self) {
let usage = self.get_current_cpu_usage();
let mut samples = self.samples.lock().unwrap();
samples.push(usage);
}
fn get_current_cpu_usage(&self) -> CpuUsage {
// Simulate CPU usage measurement
let samples_count = self.samples.lock().unwrap().len();
CpuUsage {
overall_percent: 25.0 + (samples_count as f64 * 2.0).min(75.0),
user_percent: 20.0 + (samples_count as f64 * 1.5).min(60.0),
system_percent: 5.0 + (samples_count as f64 * 0.5).min(15.0),
timestamp: SystemTime::now(),
}
}
fn get_stats(&self) -> CpuStats {
let samples = self.samples.lock().unwrap().clone();
CpuStats {
samples,
}
}
}
#[derive(Debug, Clone)]
struct CpuUsage {
overall_percent: f64,
user_percent: f64,
system_percent: f64,
timestamp: SystemTime,
}
#[derive(Debug)]
struct CpuStats {
samples: Vec<CpuUsage>,
}
impl CpuStats {
fn average_usage(&self) -> f64 {
if self.samples.is_empty() {
0.0
} else {
let total: f64 = self.samples.iter().map(|s| s.overall_percent).sum();
total / self.samples.len() as f64
}
}
}
#[tokio::test]
async fn test_thread_contention_prevention() {
// Test that OCR and sync operations don't contend for resources
let shared_resource = Arc::new(Mutex::new(SharedResource::new()));
let ocr_completed = Arc::new(AtomicU32::new(0));
let sync_completed = Arc::new(AtomicU32::new(0));
let resource_clone1 = Arc::clone(&shared_resource);
let ocr_counter = Arc::clone(&ocr_completed);
// Simulate OCR work
let ocr_handle = tokio::spawn(async move {
for _ in 0..10 {
{
let mut resource = resource_clone1.lock().unwrap();
resource.ocr_operations += 1;
}
sleep(Duration::from_millis(10)).await;
ocr_counter.fetch_add(1, Ordering::Relaxed);
}
});
let resource_clone2 = Arc::clone(&shared_resource);
let sync_counter = Arc::clone(&sync_completed);
// Simulate sync work
let sync_handle = tokio::spawn(async move {
for _ in 0..10 {
{
let mut resource = resource_clone2.lock().unwrap();
resource.sync_operations += 1;
}
sleep(Duration::from_millis(10)).await;
sync_counter.fetch_add(1, Ordering::Relaxed);
}
});
// Wait for both to complete
tokio::try_join!(ocr_handle, sync_handle).unwrap();
// Verify both completed successfully
assert_eq!(ocr_completed.load(Ordering::Relaxed), 10);
assert_eq!(sync_completed.load(Ordering::Relaxed), 10);
let resource = shared_resource.lock().unwrap();
assert_eq!(resource.ocr_operations, 10);
assert_eq!(resource.sync_operations, 10);
}
#[derive(Debug)]
struct SharedResource {
ocr_operations: u32,
sync_operations: u32,
}
impl SharedResource {
fn new() -> Self {
Self {
ocr_operations: 0,
sync_operations: 0,
}
}
}
#[test]
fn test_performance_degradation_detection() {
let performance_tracker = PerformanceTracker::new();
// Record baseline performance
for _ in 0..10 {
performance_tracker.record_operation(Duration::from_millis(100), OperationType::Sync);
}
// Record degraded performance
for _ in 0..5 {
performance_tracker.record_operation(Duration::from_millis(300), OperationType::Sync);
}
let degradation = performance_tracker.detect_performance_degradation(OperationType::Sync);
assert!(degradation.is_some(), "Should detect performance degradation");
let degradation = degradation.unwrap();
assert!(degradation.severity > 1.0, "Should show significant degradation");
assert!(degradation.baseline_duration < degradation.current_duration);
}
struct PerformanceTracker {
operations: Arc<Mutex<Vec<PerformanceOperation>>>,
}
impl PerformanceTracker {
fn new() -> Self {
Self {
operations: Arc::new(Mutex::new(Vec::new())),
}
}
fn record_operation(&self, duration: Duration, op_type: OperationType) {
let mut ops = self.operations.lock().unwrap();
ops.push(PerformanceOperation {
duration,
op_type,
timestamp: SystemTime::now(),
});
}
fn detect_performance_degradation(&self, op_type: OperationType) -> Option<PerformanceDegradation> {
let ops = self.operations.lock().unwrap();
let relevant_ops: Vec<_> = ops.iter()
.filter(|op| op.op_type == op_type)
.collect();
if relevant_ops.len() < 10 {
return None; // Need more data
}
// Calculate baseline (first 10 operations)
let baseline_duration: Duration = relevant_ops.iter()
.take(10)
.map(|op| op.duration)
.sum::<Duration>() / 10;
// Calculate recent performance (last 5 operations)
let recent_duration: Duration = relevant_ops.iter()
.rev()
.take(5)
.map(|op| op.duration)
.sum::<Duration>() / 5;
let severity = recent_duration.as_millis() as f64 / baseline_duration.as_millis() as f64;
if severity > 1.5 { // 50% degradation threshold
Some(PerformanceDegradation {
baseline_duration,
current_duration: recent_duration,
severity,
operation_type: op_type,
})
} else {
None
}
}
}
#[derive(Debug, Clone, PartialEq)]
enum OperationType {
Sync,
Ocr,
Database,
}
#[derive(Debug, Clone)]
struct PerformanceOperation {
duration: Duration,
op_type: OperationType,
timestamp: SystemTime,
}
#[derive(Debug)]
struct PerformanceDegradation {
baseline_duration: Duration,
current_duration: Duration,
severity: f64,
operation_type: OperationType,
}
#[tokio::test]
async fn test_backpressure_handling() {
let queue = Arc::new(Mutex::new(TaskQueue::new(10))); // Max 10 items
let processed_count = Arc::new(AtomicU32::new(0));
let stop_signal = Arc::new(AtomicU32::new(0));
let queue_clone = Arc::clone(&queue);
let count_clone = Arc::clone(&processed_count);
let stop_clone = Arc::clone(&stop_signal);
// Start processor with timeout
let processor_handle = tokio::spawn(async move {
let start_time = Instant::now();
loop {
// Exit if timeout exceeded (30 seconds)
if start_time.elapsed() > Duration::from_secs(30) {
break;
}
// Exit if stop signal received
if stop_clone.load(Ordering::Relaxed) > 0 {
break;
}
let task = {
let mut q = queue_clone.lock().unwrap();
q.pop()
};
match task {
Some(_task) => {
// Simulate processing
sleep(Duration::from_millis(5)).await; // Faster processing
count_clone.fetch_add(1, Ordering::Relaxed);
},
None => {
sleep(Duration::from_millis(2)).await; // Shorter sleep
}
}
}
});
// Try to add more tasks than queue capacity
let mut successful_adds = 0;
let mut backpressure_hits = 0;
// Add tasks more aggressively to trigger backpressure
for i in 0..25 {
let mut queue_ref = queue.lock().unwrap();
if queue_ref.try_push(Task { id: i }) {
successful_adds += 1;
} else {
backpressure_hits += 1;
}
drop(queue_ref);
sleep(Duration::from_millis(1)).await;
}
// Wait a bit for processing, then signal stop
sleep(Duration::from_millis(200)).await;
stop_signal.store(1, Ordering::Relaxed);
// Wait for processor with timeout
let _ = timeout(Duration::from_secs(5), processor_handle).await;
println!("Successful adds: {}, Backpressure hits: {}, Processed: {}",
successful_adds, backpressure_hits, processed_count.load(Ordering::Relaxed));
assert!(backpressure_hits > 0, "Should hit backpressure when queue is full");
assert!(successful_adds > 0, "Should successfully add some tasks");
// Don't require exact equality since processing may not complete all tasks
assert!(processed_count.load(Ordering::Relaxed) > 0, "Should process some tasks");
}
#[derive(Debug, Clone)]
struct Task {
id: u32,
}
struct TaskQueue {
items: Vec<Task>,
max_size: usize,
}
impl TaskQueue {
fn new(max_size: usize) -> Self {
Self {
items: Vec::new(),
max_size,
}
}
fn try_push(&mut self, task: Task) -> bool {
if self.items.len() >= self.max_size {
false // Backpressure
} else {
self.items.push(task);
true
}
}
fn pop(&mut self) -> Option<Task> {
if self.items.is_empty() {
None
} else {
Some(self.items.remove(0))
}
}
}