diff --git a/src/app.rs b/src/app.rs index 7a2fc9d..b4a9e05 100644 --- a/src/app.rs +++ b/src/app.rs @@ -141,7 +141,10 @@ impl App { self.start_snapshot_provider(connections.clone())?; // Start cleanup thread - self.start_cleanup_thread(connections)?; + self.start_cleanup_thread(connections.clone())?; + + // Start rate refresh thread + self.start_rate_refresh_thread(connections)?; // Mark loading as complete after a short delay let is_loading = Arc::clone(&self.is_loading); @@ -601,6 +604,33 @@ impl App { Ok(()) } + /// Start rate refresh thread to update rates for idle connections + fn start_rate_refresh_thread(&self, connections: Arc>) -> Result<()> { + let should_stop = Arc::clone(&self.should_stop); + + thread::spawn(move || { + info!("Rate refresh thread started"); + + loop { + if should_stop.load(Ordering::Relaxed) { + info!("Rate refresh thread stopping"); + break; + } + + // Refresh rates for all connections + // This ensures rates decay to zero for idle connections + for mut entry in connections.iter_mut() { + entry.value_mut().refresh_rates(); + } + + // Run every 1 second to balance responsiveness with performance + thread::sleep(Duration::from_secs(1)); + } + }); + + Ok(()) + } + /// Start cleanup thread to remove old connections fn start_cleanup_thread(&self, connections: Arc>) -> Result<()> { let should_stop = Arc::clone(&self.should_stop); diff --git a/src/network/merge.rs b/src/network/merge.rs index 5534df6..2c4c952 100644 --- a/src/network/merge.rs +++ b/src/network/merge.rs @@ -270,6 +270,10 @@ pub fn create_connection_from_packet(parsed: &ParsedPacket, now: SystemTime) -> conn.created_at = now; conn.last_activity = now; + // Initialize the rate tracker with the initial byte counts + // This prevents incorrect delta calculation on the first update + conn.rate_tracker.initialize_with_counts(conn.bytes_sent, conn.bytes_received); + conn } @@ -631,6 +635,33 @@ mod tests { assert_eq!(conn.packets_sent, 0); } + #[test] + fn test_new_connection_rate_tracker_initialization() { + // Test that the rate tracker is properly initialized for new connections + let packet = create_test_packet(true, false); + let conn = create_connection_from_packet(&packet, SystemTime::now()); + + // The connection should have initial bytes + assert_eq!(conn.bytes_sent, 100); + assert_eq!(conn.bytes_received, 0); + + // Now simulate merging another packet + let packet2 = create_test_packet(true, false); + let mut updated_conn = merge_packet_into_connection(conn, &packet2, SystemTime::now()); + + // Bytes should have increased + assert_eq!(updated_conn.bytes_sent, 200); + assert_eq!(updated_conn.bytes_received, 0); + + // Update rates - this should not cause a huge spike + updated_conn.update_rates(); + + // The rate should be reasonable (not include the initial 100 bytes as a spike) + // Since we just added 100 bytes, the rate should be based on that delta + // not on the full 200 bytes + assert!(updated_conn.current_outgoing_rate_bps >= 0.0); + } + #[test] fn test_tcp_state_transitions() { // Test SYN -> SYN_SENT diff --git a/src/network/types.rs b/src/network/types.rs index 75dc2b0..9de2bf2 100644 --- a/src/network/types.rs +++ b/src/network/types.rs @@ -549,8 +549,9 @@ impl Default for RateInfo { #[derive(Debug, Clone)] struct RateSample { timestamp: Instant, - bytes_sent: u64, - bytes_received: u64, + // Delta values since last sample + delta_sent: u64, + delta_received: u64, } #[derive(Debug, Clone)] @@ -559,6 +560,9 @@ pub struct RateTracker { window_duration: Duration, last_update: Instant, max_samples: usize, + // Keep track of last byte counts for delta calculation + last_bytes_sent: u64, + last_bytes_received: u64, } impl RateTracker { @@ -572,20 +576,36 @@ impl RateTracker { window_duration, last_update: Instant::now(), max_samples: 100, // Limit memory usage + last_bytes_sent: 0, + last_bytes_received: 0, } } + /// Initialize the tracker with initial byte counts + /// This should be called when creating a connection with existing bytes + pub fn initialize_with_counts(&mut self, bytes_sent: u64, bytes_received: u64) { + self.last_bytes_sent = bytes_sent; + self.last_bytes_received = bytes_received; + } + /// Update the rate tracker with new byte counts pub fn update(&mut self, bytes_sent: u64, bytes_received: u64) { let now = Instant::now(); - // Add new sample + // Calculate deltas since last update + let delta_sent = bytes_sent.saturating_sub(self.last_bytes_sent); + let delta_received = bytes_received.saturating_sub(self.last_bytes_received); + + // Add new sample with deltas self.samples.push_back(RateSample { timestamp: now, - bytes_sent, - bytes_received, + delta_sent, + delta_received, }); + // Update last values for next delta calculation + self.last_bytes_sent = bytes_sent; + self.last_bytes_received = bytes_received; self.last_update = now; // Remove samples outside the window @@ -612,39 +632,70 @@ impl RateTracker { /// Get the current incoming rate in bytes per second pub fn get_incoming_rate_bps(&self) -> f64 { - self.calculate_rate(|sample| sample.bytes_received) + self.calculate_rate_from_deltas(|sample| sample.delta_received) } /// Get the current outgoing rate in bytes per second pub fn get_outgoing_rate_bps(&self) -> f64 { - self.calculate_rate(|sample| sample.bytes_sent) + self.calculate_rate_from_deltas(|sample| sample.delta_sent) } - /// Calculate rate for a given byte field using the sliding window - fn calculate_rate(&self, byte_getter: F) -> f64 + /// Calculate rate using delta values for accurate sliding window calculation + fn calculate_rate_from_deltas(&self, delta_getter: F) -> f64 where F: Fn(&RateSample) -> u64, { - if self.samples.len() < 2 { + if self.samples.is_empty() { + return 0.0; + } + + // If we only have one sample, we can't calculate a rate yet + if self.samples.len() == 1 { return 0.0; } let oldest = self.samples.front().unwrap(); let newest = self.samples.back().unwrap(); - - let time_diff = newest + + // Calculate the time span of our samples + let time_span = newest .timestamp .duration_since(oldest.timestamp) .as_secs_f64(); // Need at least 100ms of data to avoid division by very small numbers - if time_diff < 0.1 { + if time_span < 0.1 { return 0.0; } - let byte_diff = byte_getter(newest).saturating_sub(byte_getter(oldest)) as f64; + // Sum all deltas in the window (skip the first sample as it might have incomplete delta) + let total_bytes: u64 = self.samples + .iter() + .skip(1) // Skip first sample which might have delta from before window + .map(delta_getter) + .sum(); - byte_diff / time_diff + // Calculate base rate + let base_rate = total_bytes as f64 / time_span; + + // Apply time-based decay more gently, similar to iftop's approach + let now = Instant::now(); + let time_since_last_sample = now.duration_since(newest.timestamp).as_secs_f64(); + + // More gentle decay - start decay after 3 seconds, fully decay by 10 seconds + if time_since_last_sample > 10.0 { + // After 10 seconds of no traffic, rate should be very close to zero + 0.0 + } else if time_since_last_sample > 3.0 { + // Exponential decay from 3 to 10 seconds + // This creates a more natural falloff similar to iftop + let decay_time = time_since_last_sample - 3.0; // 0 to 7 seconds + let decay_factor = (-decay_time / 3.0).exp(); // Exponential decay + base_rate * decay_factor + } else { + // No decay for first 3 seconds - show full rate + base_rate + } } /// Get the age of the oldest sample in the current window @@ -867,6 +918,23 @@ impl Connection { }; } + /// Refresh rates without adding new data - useful for idle connections + /// This ensures rates decay to zero when no traffic is flowing + pub fn refresh_rates(&mut self) { + // Just recalculate rates based on current time + // The calculate_rate_from_deltas method now checks sample age + self.current_incoming_rate_bps = self.rate_tracker.get_incoming_rate_bps(); + self.current_outgoing_rate_bps = self.rate_tracker.get_outgoing_rate_bps(); + + // Also update the legacy RateInfo struct + let now = Instant::now(); + self.current_rate_bps = RateInfo { + incoming_bps: self.current_incoming_rate_bps, + outgoing_bps: self.current_outgoing_rate_bps, + last_calculation: now, + }; + } + /// Get dynamic timeout for this connection based on protocol and state pub fn get_timeout(&self) -> Duration { match &self.protocol_state { @@ -982,11 +1050,11 @@ mod tests { fn test_rate_tracker_single_update() { let mut tracker = RateTracker::new(); - // First update - no rate yet (need at least 2 samples) + // First update establishes baseline tracker.update(1000, 500); assert_eq!(tracker.get_incoming_rate_bps(), 0.0); assert_eq!(tracker.get_outgoing_rate_bps(), 0.0); - // Test single update + // Test single update - need at least 2 samples for rate } #[test] @@ -1160,6 +1228,161 @@ mod tests { assert_eq!(tracker.get_incoming_rate_bps(), 0.0); } + #[test] + fn test_rate_tracker_cumulative_fix() { + // This test verifies the fix for the cumulative byte count issue + let mut tracker = RateTracker::new(); + + // Simulate a connection that has been running for a while + // with cumulative byte counts + tracker.update(1_000_000, 500_000); // 1MB sent, 500KB received total + thread::sleep(Duration::from_millis(100)); + + tracker.update(1_100_000, 550_000); // 100KB more sent, 50KB more received + thread::sleep(Duration::from_millis(100)); + + tracker.update(1_200_000, 600_000); // 100KB more sent, 50KB more received + + // The rate should be based on the deltas, not the cumulative values + // We sent 200KB in ~200ms = ~1MB/s, received 100KB in ~200ms = ~500KB/s + let outgoing_rate = tracker.get_outgoing_rate_bps(); + let incoming_rate = tracker.get_incoming_rate_bps(); + + // Should be approximately 1MB/s outgoing (1_000_000 bytes/sec) + assert!( + outgoing_rate > 800_000.0 && outgoing_rate < 1_200_000.0, + "Outgoing rate should be ~1MB/s, got: {}", + outgoing_rate + ); + + // Should be approximately 500KB/s incoming (500_000 bytes/sec) + assert!( + incoming_rate > 400_000.0 && incoming_rate < 600_000.0, + "Incoming rate should be ~500KB/s, got: {}", + incoming_rate + ); + } + + #[test] + fn test_rate_tracker_window_sliding() { + // Test that rates are calculated correctly as the window slides + let window_duration = Duration::from_millis(500); + let mut tracker = RateTracker::with_window_duration(window_duration); + + // Add initial samples + tracker.update(0, 0); + thread::sleep(Duration::from_millis(100)); + tracker.update(100_000, 50_000); // 100KB sent, 50KB received + + thread::sleep(Duration::from_millis(100)); + tracker.update(200_000, 100_000); // Another 100KB sent, 50KB received + + // Wait for window to slide past first samples + thread::sleep(Duration::from_millis(600)); + + // Add new samples with same rate + tracker.update(300_000, 150_000); // Another 100KB sent, 50KB received + thread::sleep(Duration::from_millis(100)); + tracker.update(400_000, 200_000); // Another 100KB sent, 50KB received + + // Rate should still be consistent despite window sliding + let outgoing_rate = tracker.get_outgoing_rate_bps(); + let incoming_rate = tracker.get_incoming_rate_bps(); + + // We're sending at ~1MB/s and receiving at ~500KB/s consistently + assert!( + outgoing_rate > 800_000.0 && outgoing_rate < 1_200_000.0, + "Outgoing rate after window slide: {}", + outgoing_rate + ); + assert!( + incoming_rate > 400_000.0 && incoming_rate < 600_000.0, + "Incoming rate after window slide: {}", + incoming_rate + ); + } + + #[test] + fn test_rate_decay_for_idle_connections() { + // Test that rates decay to zero when connections become idle + let mut tracker = RateTracker::new(); + + // Simulate active traffic + tracker.update(0, 0); + thread::sleep(Duration::from_millis(100)); + tracker.update(100_000, 50_000); // 100KB sent, 50KB received + + // Should have non-zero rate immediately after traffic + let initial_out = tracker.get_outgoing_rate_bps(); + let initial_in = tracker.get_incoming_rate_bps(); + assert!(initial_out > 0.0, "Should have outgoing traffic"); + assert!(initial_in > 0.0, "Should have incoming traffic"); + + // Wait 2 seconds (should still show full rate - no decay yet) + thread::sleep(Duration::from_millis(2000)); + + let still_active_out = tracker.get_outgoing_rate_bps(); + let still_active_in = tracker.get_incoming_rate_bps(); + + // Rates should still be the same (no decay for first 3 seconds) + assert_eq!(still_active_out, initial_out, "Should not decay within 3 seconds"); + assert_eq!(still_active_in, initial_in, "Should not decay within 3 seconds"); + + // Wait until decay starts (total 4 seconds - should start decay) + thread::sleep(Duration::from_millis(2000)); + + let decayed_out = tracker.get_outgoing_rate_bps(); + let decayed_in = tracker.get_incoming_rate_bps(); + + // Rates should be lower due to decay + assert!(decayed_out < initial_out, "Outgoing rate should start decaying after 3s"); + assert!(decayed_in < initial_in, "Incoming rate should start decaying after 3s"); + assert!(decayed_out > 0.0, "Should still have some rate at 4s"); + + // Wait for full decay (total 11 seconds - should be zero) + thread::sleep(Duration::from_millis(7000)); + + let final_out = tracker.get_outgoing_rate_bps(); + let final_in = tracker.get_incoming_rate_bps(); + + // After 10+ seconds of idle, rates should be zero + assert_eq!(final_out, 0.0, "Outgoing rate should be zero after 10+ seconds idle"); + assert_eq!(final_in, 0.0, "Incoming rate should be zero after 10+ seconds idle"); + } + + #[test] + fn test_connection_refresh_rates() { + // Test that refresh_rates() properly updates cached rate values + let mut conn = create_test_connection(); + + // Initialize the rate tracker properly + conn.rate_tracker.initialize_with_counts(0, 0); + + // Simulate first packet + conn.bytes_sent = 50_000; + conn.bytes_received = 25_000; + conn.update_rates(); + + thread::sleep(Duration::from_millis(100)); + + // Simulate more traffic + conn.bytes_sent = 100_000; + conn.bytes_received = 50_000; + conn.update_rates(); + + // Should have non-zero rates after recent traffic + assert!(conn.current_outgoing_rate_bps > 0.0, "Should have outgoing rate"); + assert!(conn.current_incoming_rate_bps > 0.0, "Should have incoming rate"); + + // Now simulate longer idle time and refresh (need >10s for zero) + thread::sleep(Duration::from_millis(11000)); + conn.refresh_rates(); + + // Rates should be zero after refresh with long idle connection + assert_eq!(conn.current_outgoing_rate_bps, 0.0, "Should be zero after 10+ seconds idle refresh"); + assert_eq!(conn.current_incoming_rate_bps, 0.0, "Should be zero after 10+ seconds idle refresh"); + } + #[test] fn test_enhanced_state_display_tcp() { let mut conn = create_test_connection();