fix: Implement windowed average for bandwidth calculation

This commit is contained in:
Marco Cadetg (aider)
2025-05-10 14:47:51 +02:00
parent 489eea7ccf
commit 8328b6c572
2 changed files with 103 additions and 73 deletions
+96 -64
View File
@@ -7,7 +7,7 @@ use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant; // Added Instant back as it's used in on_tick
use std::time::{Duration, Instant}; // Added Duration
use crate::config::Config;
use crate::i18n::I18n;
@@ -74,7 +74,9 @@ pub struct App {
processes_data_shared: Option<Arc<Mutex<HashMap<u32, Process>>>>,
}
const PROCESS_INFO_UPDATE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
const PROCESS_INFO_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
const RATE_CALCULATION_WINDOW: Duration = Duration::from_secs(5);
const RATE_HISTORY_PRUNE_EXTENSION: Duration = Duration::from_secs(2); // Keep data a bit longer than the window
impl App {
/// Create a new application instance
@@ -498,54 +500,18 @@ impl App {
// Store currently selected connection (if any)
let selected_conn_key = self.selected_connection.as_ref().map(|sc| self.get_connection_key(sc));
// Create a map of old connection states for rate calculation
// This captures the state from the end of the previous tick.
let mut old_rate_states = HashMap::new();
for old_conn in &self.connections {
old_rate_states.insert(
self.get_connection_key(old_conn),
(old_conn.prev_bytes_sent, old_conn.prev_bytes_received, old_conn.last_rate_update_time)
);
}
// Update connections from shared data updated by the background thread
if let Some(shared_data_arc) = &self.connections_data_shared {
let mut new_connections_list = shared_data_arc.lock().unwrap().clone();
// Calculate current rates for each connection in the new_connections_list
// and update their prev_bytes and last_rate_update_time for the next tick.
// Calculate average rates for each connection using its history
for conn_mut in &mut new_connections_list {
let conn_key = self.get_connection_key(conn_mut);
if let Some((prev_s, prev_r, last_update)) = old_rate_states.get(&conn_key) {
let time_delta = now.duration_since(*last_update);
let time_delta_secs = time_delta.as_secs_f64();
if time_delta_secs > 0.0 {
let bytes_sent_delta = conn_mut.bytes_sent.saturating_sub(*prev_s);
let bytes_received_delta = conn_mut.bytes_received.saturating_sub(*prev_r);
conn_mut.current_outgoing_rate_bps = bytes_sent_delta as f64 / time_delta_secs;
conn_mut.current_incoming_rate_bps = bytes_received_delta as f64 / time_delta_secs;
} else {
// Time delta is zero or negative, set rate to 0 or carry over previous rate if meaningful
// For simplicity, setting to 0 if time_delta_secs is not positive.
conn_mut.current_outgoing_rate_bps = 0.0;
conn_mut.current_incoming_rate_bps = 0.0;
}
} else {
// This connection was not in self.connections last tick (new for rate calculation)
conn_mut.current_outgoing_rate_bps = 0.0;
conn_mut.current_incoming_rate_bps = 0.0;
}
// Prepare for the next tick: current totals become previous, update time to now
conn_mut.prev_bytes_sent = conn_mut.bytes_sent;
conn_mut.prev_bytes_received = conn_mut.bytes_received;
conn_mut.last_rate_update_time = now;
Self::calculate_and_update_average_rate(conn_mut, now, RATE_CALCULATION_WINDOW);
}
// Extract keys for sorting from the (now rate-updated) new_connections_list
let mut keys_to_process = Vec::new();
for conn in &new_connections_list { // Iterate over new_connections_list
for conn in &new_connections_list {
let key = self.get_connection_key(conn);
keys_to_process.push(key);
}
@@ -600,29 +566,9 @@ impl App {
}
} else {
// connections_data_shared is None, likely before start_capture fully initializes it.
// Still, update rates for existing connections if any (e.g. from initial load)
// let now = Instant::now(); // Use `now` from top of on_tick
// Removed unused process_info_updated_this_tick and its related block.
// The main process info update logic is handled later in this function.
for conn in &mut self.connections { // This operates on the persistent self.connections
// Update rates.
let time_delta = now.duration_since(conn.last_rate_update_time); // This uses the correct last_rate_update_time
let time_delta_secs = time_delta.as_secs_f64();
if time_delta_secs > 0.0 {
let bytes_sent_delta = conn.bytes_sent.saturating_sub(conn.prev_bytes_sent);
let bytes_received_delta = conn.bytes_received.saturating_sub(conn.prev_bytes_received);
conn.current_outgoing_rate_bps = bytes_sent_delta as f64 / time_delta_secs;
conn.current_incoming_rate_bps = bytes_received_delta as f64 / time_delta_secs;
} else {
conn.current_outgoing_rate_bps = 0.0;
conn.current_incoming_rate_bps = 0.0;
}
conn.prev_bytes_sent = conn.bytes_sent;
conn.prev_bytes_received = conn.bytes_received;
conn.last_rate_update_time = now;
// We can still try to update rates for the existing self.connections using their history.
for conn_mut in &mut self.connections {
Self::calculate_and_update_average_rate(conn_mut, now, RATE_CALCULATION_WINDOW);
}
}
@@ -881,4 +827,90 @@ impl App {
fn test_method(&mut self) { // New diagnostic method
log::info!("<<<<<< App::test_method CALLED >>>>>>");
}
/// Calculates and updates the average send/receive rate for a connection based on its history.
/// Also prunes the history.
fn calculate_and_update_average_rate(conn: &mut Connection, current_time: Instant, window: Duration) {
if conn.rate_history.len() < 2 {
conn.current_incoming_rate_bps = 0.0;
conn.current_outgoing_rate_bps = 0.0;
return;
}
// Sort history by timestamp just in case, though it should be appended in order.
// conn.rate_history.sort_by_key(|k| k.0); // Usually not needed if NetworkMonitor appends correctly
let latest_entry = match conn.rate_history.last() {
Some(entry) => entry,
None => { // Should be caught by len < 2 check, but defensive
conn.current_incoming_rate_bps = 0.0;
conn.current_outgoing_rate_bps = 0.0;
return;
}
};
let latest_time = latest_entry.0;
let latest_bytes_sent = latest_entry.1;
let latest_bytes_received = latest_entry.2;
// Find the oldest entry within the window relative to the latest_time
let window_start_time = latest_time.checked_sub(window);
let mut oldest_relevant_entry_idx = None;
if let Some(start_time) = window_start_time {
// Iterate backwards to find the first entry outside the window, then pick the next one
// Or, find the first entry *within* the window from the start.
for (i, entry) in conn.rate_history.iter().enumerate() {
if entry.0 >= start_time {
oldest_relevant_entry_idx = Some(i);
break;
}
}
} else { // window_start_time underflowed, means window is larger than history span from latest_time
oldest_relevant_entry_idx = Some(0); // Use the very first entry
}
let (rate_in, rate_out) = match oldest_relevant_entry_idx {
Some(idx) if idx < conn.rate_history.len() -1 => { // Ensure we have at least two points: oldest_relevant and latest
let oldest_relevant_entry = &conn.rate_history[idx];
let time_delta = latest_time.duration_since(oldest_relevant_entry.0);
let time_delta_secs = time_delta.as_secs_f64();
if time_delta_secs > 0.001 { // Avoid division by zero or tiny intervals
let bytes_sent_delta = latest_bytes_sent.saturating_sub(oldest_relevant_entry.1);
let bytes_received_delta = latest_bytes_received.saturating_sub(oldest_relevant_entry.2);
let out_bps = (bytes_sent_delta as f64 * 8.0) / time_delta_secs;
let in_bps = (bytes_received_delta as f64 * 8.0) / time_delta_secs;
(in_bps, out_bps)
} else {
(0.0, 0.0)
}
}
_ => (0.0, 0.0), // Not enough data points in the window or only one point
};
conn.current_incoming_rate_bps = rate_in;
conn.current_outgoing_rate_bps = rate_out;
// Prune history: keep entries younger than `current_time - (window + buffer)`
let prune_older_than = current_time.checked_sub(window + RATE_HISTORY_PRUNE_EXTENSION);
if let Some(prune_time) = prune_older_than {
conn.rate_history.retain(|(t, _, _)| *t >= prune_time);
} else { // If window + buffer is too large, effectively don't prune based on current_time
// Or, if history is very short, this might not prune much.
// A simpler prune: ensure we don't keep excessively old data if current_time is far ahead of latest_time
if let Some(latest_hist_time) = conn.rate_history.last().map(|e| e.0) {
let absolute_prune_time = latest_hist_time.checked_sub(window + RATE_HISTORY_PRUNE_EXTENSION);
if let Some(apt) = absolute_prune_time {
conn.rate_history.retain(|(t, _, _)| *t >= apt);
}
}
}
// Ensure at least one entry is kept if history is not empty, to allow future rate calculations.
if conn.rate_history.is_empty() && latest_entry.0 >= current_time.checked_sub(window + RATE_HISTORY_PRUNE_EXTENSION).unwrap_or(latest_entry.0) {
conn.rate_history.push(*latest_entry);
}
}
}
+7 -9
View File
@@ -94,11 +94,12 @@ pub struct Connection {
pub last_activity: SystemTime,
pub service_name: Option<String>,
// Fields for current rate calculation
pub prev_bytes_sent: u64,
pub prev_bytes_received: u64,
pub last_rate_update_time: Instant,
pub prev_bytes_sent: u64, // Still used by NetworkMonitor for cumulative tracking if needed elsewhere
pub prev_bytes_received: u64, // Still used by NetworkMonitor for cumulative tracking if needed elsewhere
pub last_rate_update_time: Instant, // Still used by NetworkMonitor for cumulative tracking if needed elsewhere
pub current_incoming_rate_bps: f64,
pub current_outgoing_rate_bps: f64,
pub rate_history: Vec<(Instant, u64, u64)>, // Stores (timestamp, total_bytes_sent, total_bytes_received)
}
// get_service_name_raw function is removed.
@@ -132,6 +133,7 @@ impl Connection {
last_rate_update_time: Instant::now(),
current_incoming_rate_bps: 0.0,
current_outgoing_rate_bps: 0.0,
rate_history: Vec::new(),
};
new_conn
}
@@ -594,18 +596,13 @@ impl NetworkMonitor {
conn.bytes_received += data.len() as u64;
}
conn.state = state;
conn.rate_history.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, service_lookup);
} else {
let mut new_conn =
Connection::new(Protocol::TCP, local_addr, remote_addr, state);
new_conn.last_activity = SystemTime::now();
// Attempt to get PID immediately for new packet-only connections
// Note: get_platform_process_for_connection is on NetworkMonitor,
// so this needs to be called where `self` (NetworkMonitor) is available,
// or PID resolution needs to be handled differently for packet-only connections.
// For now, we'll rely on get_connections to attempt PID resolution later,
// or the App's process thread. The change below in get_connections is more direct.
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
@@ -613,6 +610,7 @@ impl NetworkMonitor {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((Instant::now(), new_conn.bytes_sent, new_conn.bytes_received));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, service_lookup);
monitor_connections.insert(conn_key, new_conn);