fix: Improve connection merging and key generation for accurate rates

This commit is contained in:
Marco Cadetg (aider)
2025-05-10 17:57:16 +02:00
parent fd6cacdfd5
commit 97fa365fea
+69 -47
View File
@@ -418,70 +418,80 @@ impl NetworkMonitor {
log::debug!("NetworkMonitor::get_connections - Starting to fetch connections (without packet processing)");
// Packet processing is now handled externally.
// Get connections from system methods
let mut connections = Vec::new();
// Get connections from system methods (ss, netstat)
let mut platform_conns_vec = Vec::new();
log::debug!("NetworkMonitor::get_connections - Calling get_platform_connections (ss/netstat)");
self.get_platform_connections(&mut platform_conns_vec)?;
log::debug!("NetworkMonitor::get_connections - get_platform_connections returned {} connections", platform_conns_vec.len());
// Use platform-specific code to get connections
log::debug!("NetworkMonitor::get_connections - Calling get_platform_connections");
self.get_platform_connections(&mut connections)?;
log::debug!("NetworkMonitor::get_connections - get_platform_connections returned {} connections", connections.len());
// Use a HashMap to merge, ensuring packet data (especially rate_history) is prioritized.
// Key: String representation of (protocol, local_addr, remote_addr)
let mut merged_connections: HashMap<String, Connection> = HashMap::new();
// Add connections from packet capture
log::debug!("NetworkMonitor::get_connections - Merging packet capture connections (current count: {})", self.connections.len());
let mut packet_conn_updates: Vec<(String, u32, String)> = Vec::new();
// 1. Add all connections from packet capture (self.connections HashMap) first.
// These have the byte counts and rate_history.
for (key, packet_conn) in &self.connections {
// Consider active connections or those also seen by platform tools for merging
if packet_conn.is_active() || platform_conns_vec.iter().any(|pc| self.get_connection_key_for_merge(pc) == *key) {
merged_connections.insert(key.clone(), packet_conn.clone());
}
}
log::debug!("NetworkMonitor::get_connections - Initial merge map size from packet data: {}", merged_connections.len());
for (key, conn_from_packets) in &self.connections {
// Check if this connection exists in the list already (from platform tools)
let exists_in_platform_list = connections.iter().any(|c_plat| {
c_plat.protocol == conn_from_packets.protocol
&& c_plat.local_addr == conn_from_packets.local_addr
&& c_plat.remote_addr == conn_from_packets.remote_addr
});
if !exists_in_platform_list && conn_from_packets.is_active() {
let mut conn_to_add_to_results = conn_from_packets.clone();
// If packet-captured connection doesn't have a PID yet, try to resolve it.
if conn_to_add_to_results.pid.is_none() {
if let Some(process_details) = self.get_platform_process_for_connection(&conn_to_add_to_results) {
conn_to_add_to_results.pid = Some(process_details.pid);
conn_to_add_to_results.process_name = Some(process_details.name.clone());
// Mark this key for PID and name update in self.connections (the HashMap)
packet_conn_updates.push((key.clone(), process_details.pid, process_details.name.clone()));
}
// 2. Iterate through platform connections. If a match is found in merged_connections,
// update its state, PID, and process name. If not found, add it.
for platform_conn in platform_conns_vec {
let key = self.get_connection_key_for_merge(&platform_conn);
if let Some(existing_conn) = merged_connections.get_mut(&key) {
// Connection exists from packet capture, update with platform info
existing_conn.state = platform_conn.state; // Platform tools might have more current state
if platform_conn.pid.is_some() {
existing_conn.pid = platform_conn.pid;
}
connections.push(conn_to_add_to_results);
if platform_conn.process_name.is_some() {
existing_conn.process_name = platform_conn.process_name; // platform_conn.process_name is Option<String>, so clone is implicit or not needed if moved
}
// Byte counts, packet counts, and rate_history from existing_conn (packet data) are preserved.
} else {
// Connection only found by platform tools, add it.
// It will have 0 byte counts and empty rate_history initially.
merged_connections.insert(key, platform_conn);
}
}
// Update PIDs and names in self.connections (the HashMap) for packet-only connections where details were just found
for (key, pid_to_set, name_to_set) in packet_conn_updates {
if let Some(conn_in_map) = self.connections.get_mut(&key) {
conn_in_map.pid = Some(pid_to_set);
conn_in_map.process_name = Some(name_to_set);
log::debug!("NetworkMonitor::get_connections - Merge map size after platform data: {}", merged_connections.len());
let mut result_connections: Vec<Connection> = merged_connections.into_values().collect();
// For connections that might still lack PID/process name (e.g., purely from packets
// and platform lookup failed before, or purely from platform and it didn't have it),
// try one more pass of platform-specific PID resolution.
for conn_mut in &mut result_connections {
if conn_mut.pid.is_none() {
if let Some(process_details) = self.get_platform_process_for_connection(conn_mut) {
conn_mut.pid = Some(process_details.pid);
conn_mut.process_name = Some(process_details.name);
}
}
}
// Process information fetching is now handled by App::on_tick to allow for lazy loading.
// The self.collect_process_info flag and related block are removed from here.
// Sort connections by last activity
connections.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
// Sort connections by last activity (or other criteria as needed)
result_connections.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
// Filter localhost connections if the flag is set
if self.filter_localhost {
connections.retain(|conn| {
result_connections.retain(|conn| {
!(conn.local_addr.ip().is_loopback() && conn.remote_addr.ip().is_loopback())
});
}
// Set service names for all connections
for conn in &mut connections {
for conn in &mut result_connections {
set_connection_service_name_for_connection(conn, &self.service_lookup);
}
log::info!("NetworkMonitor::get_connections - Finished fetching connections. Total: {}", connections.len());
Ok(connections)
log::info!("NetworkMonitor::get_connections - Finished fetching connections. Total: {}", result_connections.len());
Ok(result_connections)
}
// Moved set_connection_service_name to be a free function to avoid borrow checker issues in process_packets.
@@ -578,11 +588,12 @@ impl NetworkMonitor {
};
// Create or update connection
let conn_protocol = Protocol::TCP; // Define protocol for key generation
let conn_key = format!(
"{:?}:{}-{:?}:{}",
Protocol::TCP,
conn_protocol, // Use actual protocol
local_addr,
Protocol::TCP,
conn_protocol, // Use actual protocol
remote_addr
);
@@ -636,11 +647,12 @@ impl NetworkMonitor {
};
// Create or update connection
let conn_protocol = Protocol::UDP; // Define protocol for key generation
let conn_key = format!(
"{:?}:{}-{:?}:{}",
Protocol::UDP,
conn_protocol, // Use actual protocol
local_addr,
Protocol::UDP,
conn_protocol, // Use actual protocol
remote_addr
);
@@ -778,6 +790,16 @@ impl NetworkMonitor {
Ok(())
}
/// Parse an address string into a SocketAddr
/// Helper to generate a consistent key for merging connections.
/// This key should match the one used for `self.connections` HashMap.
fn get_connection_key_for_merge(&self, conn: &Connection) -> String {
format!(
"{:?}:{}-{:?}:{}",
conn.protocol, conn.local_addr, conn.protocol, conn.remote_addr
)
}
/// Parse an address string into a SocketAddr
fn parse_addr(&self, addr_str: &str) -> Option<std::net::SocketAddr> {
let addr_str = addr_str.trim();