improve services lookup

This commit is contained in:
Marco Cadetg
2025-05-12 08:16:21 +02:00
parent 60dbb59c13
commit 1126106b71
3 changed files with 12206 additions and 502 deletions

View File

@@ -14,41 +14,41 @@ pub fn get_platform_connections(
// Debug output
debug!("Attempting to get connections using platform-specific methods");
// Use ss command to get TCP connections
info!("Running ss command to get TCP connections...");
let ss_result = monitor.get_connections_from_ss(connections);
if let Err(e) = &ss_result {
error!("Error running ss command: {}", e);
} else {
info!("ss command executed successfully");
}
// Use ss command to get TCP connections
info!("Running ss command to get TCP connections...");
let ss_result = monitor.get_connections_from_ss(connections);
if let Err(e) = &ss_result {
error!("Error running ss command: {}", e);
} else {
info!("ss command executed successfully");
}
// Use netstat to get UDP connections
info!("Running netstat command to get UDP connections...");
let netstat_result = monitor.get_connections_from_netstat(connections);
if let Err(e) = &netstat_result {
error!("Error running netstat command: {}", e);
} else {
info!("netstat command executed successfully");
}
// Use netstat to get UDP connections
info!("Running netstat command to get UDP connections...");
let netstat_result = monitor.get_connections_from_netstat(connections);
if let Err(e) = &netstat_result {
error!("Error running netstat command: {}", e);
} else {
info!("netstat command executed successfully");
}
// Check if we got any connections
// Check if we got any connections
debug!(
"Found {} connections from command output",
connections.len()
);
// If we didn't get any connections from commands, try using pcap
if connections.is_empty() {
warn!("No connections found from commands, trying packet capture...");
monitor.get_connections_from_pcap(connections)?;
debug!(
"Found {} connections from command output",
"Found {} connections from packet capture",
connections.len()
);
}
// If we didn't get any connections from commands, try using pcap
if connections.is_empty() {
warn!("No connections found from commands, trying packet capture...");
monitor.get_connections_from_pcap(connections)?;
debug!(
"Found {} connections from packet capture",
connections.len()
);
}
// Note: get_linux_process_for_connection, get_process_by_pid,
// Note: get_linux_process_for_connection, get_process_by_pid,
// get_connections_from_ss, get_connections_from_netstat, get_connections_from_pcap
// remain methods on NetworkMonitor as they are called via `monitor.method_name()`
Ok(())
@@ -85,91 +85,105 @@ impl NetworkMonitor {
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout);
let line_count = text.lines().count();
debug!("'ss -tupn' command successful. Output lines: {}", line_count);
if line_count < 5 && line_count > 0 { // Log short output
debug!(
"'ss -tupn' command successful. Output lines: {}",
line_count
);
if line_count < 5 && line_count > 0 {
// Log short output
debug!("'ss -tupn' output (first {} lines):\n{}", line_count, text);
} else if line_count == 0 {
debug!("'ss -tupn' produced no output.");
}
for line in text.lines().skip(1) {
// Skip header
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 5 {
continue;
}
for line in text.lines().skip(1) {
// Skip header
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 5 {
continue;
}
// ss -tupn output fields: Netid, State, Recv-Q, Send-Q, Local Address:Port, Peer Address:Port, Process
// Example: tcp ESTAB 0 0 10.0.0.1:1234 10.0.0.2:80 users:(("myproc",pid=789,fd=5))
// Example: udp UNCONN 0 0 *:bootpc *:* users:(("dhclient",pid=123,fd=3))
// ss -tupn output fields: Netid, State, Recv-Q, Send-Q, Local Address:Port, Peer Address:Port, Process
// Example: tcp ESTAB 0 0 10.0.0.1:1234 10.0.0.2:80 users:(("myproc",pid=789,fd=5))
// Example: udp UNCONN 0 0 *:bootpc *:* users:(("dhclient",pid=123,fd=3))
// Parse protocol (Netid)
let protocol = match fields[0] {
"tcp" | "tcp6" => Protocol::TCP,
"udp" | "udp6" => Protocol::UDP,
_ => continue, // Skip if not tcp or udp
};
// Parse protocol (Netid)
let protocol = match fields[0] {
"tcp" | "tcp6" => Protocol::TCP,
"udp" | "udp6" => Protocol::UDP,
_ => continue, // Skip if not tcp or udp
};
// Parse state
let state_str = if fields.len() > 1 { fields[1] } else { "" };
let state = match state_str {
"ESTAB" => ConnectionState::Established,
"LISTEN" => ConnectionState::Listen,
"TIME-WAIT" => ConnectionState::TimeWait,
"CLOSE-WAIT" => ConnectionState::CloseWait,
"SYN-SENT" => ConnectionState::SynSent,
"SYN-RECV" => ConnectionState::SynReceived,
"FIN-WAIT-1" => ConnectionState::FinWait1,
"FIN-WAIT-2" => ConnectionState::FinWait2,
"LAST-ACK" => ConnectionState::LastAck,
"CLOSING" => ConnectionState::Closing,
"UNCONN" if protocol == Protocol::UDP => ConnectionState::Established, // UDP is connectionless, UNCONN is normal
_ => ConnectionState::Unknown,
};
// Parse state
let state_str = if fields.len() > 1 { fields[1] } else { "" };
let state = match state_str {
"ESTAB" => ConnectionState::Established,
"LISTEN" => ConnectionState::Listen,
"TIME-WAIT" => ConnectionState::TimeWait,
"CLOSE-WAIT" => ConnectionState::CloseWait,
"SYN-SENT" => ConnectionState::SynSent,
"SYN-RECV" => ConnectionState::SynReceived,
"FIN-WAIT-1" => ConnectionState::FinWait1,
"FIN-WAIT-2" => ConnectionState::FinWait2,
"LAST-ACK" => ConnectionState::LastAck,
"CLOSING" => ConnectionState::Closing,
"UNCONN" if protocol == Protocol::UDP => ConnectionState::Established, // UDP is connectionless, UNCONN is normal
_ => ConnectionState::Unknown,
};
// Ensure we have enough fields for addresses
if fields.len() < 6 { // Need up to Peer Address:Port
continue;
}
// Ensure we have enough fields for addresses
if fields.len() < 6 {
// Need up to Peer Address:Port
continue;
}
// Parse local and remote addresses (fields[4] and fields[5])
if let (Some(local), Some(remote)) =
(self.parse_addr(fields[4]), self.parse_addr(fields[5]))
{
let mut conn = Connection::new(protocol, local, remote, state);
// Parse local and remote addresses (fields[4] and fields[5])
if let (Some(local), Some(remote)) =
(self.parse_addr(fields[4]), self.parse_addr(fields[5]))
{
let mut conn = Connection::new(protocol, local, remote, state);
// Parse PID and process name (fields[6], if present)
if fields.len() >= 7 {
let process_info = fields[6]; // Process info is in the 7th field (index 6)
if let Some(pid_start) = process_info.find("pid=") {
let pid_part = &process_info[pid_start + 4..];
if let Some(pid_end) = pid_part.find(',') {
if let Ok(pid) = pid_part[..pid_end].parse::<u32>() {
conn.pid = Some(pid);
// Parse PID and process name (fields[6], if present)
if fields.len() >= 7 {
let process_info = fields[6]; // Process info is in the 7th field (index 6)
if let Some(pid_start) = process_info.find("pid=") {
let pid_part = &process_info[pid_start + 4..];
if let Some(pid_end) = pid_part.find(',') {
if let Ok(pid) = pid_part[..pid_end].parse::<u32>() {
conn.pid = Some(pid);
// Try to get process name from users:(("name",pid=...,fd=...))
if let Some(name_section_start) = process_info.find("users:((\"") {
let name_candidate_part = &process_info[name_section_start + 9..];
if let Some(name_candidate_end) = name_candidate_part.find('"') {
let raw_name = &name_candidate_part[..name_candidate_end];
let trimmed_name = raw_name
.trim_start_matches("(\"")
.trim_end_matches('"')
.to_string();
conn.process_name = Some(trimmed_name);
// Try to get process name from users:(("name",pid=...,fd=...))
if let Some(name_section_start) =
process_info.find("users:((\"")
{
let name_candidate_part =
&process_info[name_section_start + 9..];
if let Some(name_candidate_end) =
name_candidate_part.find('"')
{
let raw_name =
&name_candidate_part[..name_candidate_end];
let trimmed_name = raw_name
.trim_start_matches("(\"")
.trim_end_matches('"')
.to_string();
conn.process_name = Some(trimmed_name);
}
}
}
}
}
}
connections.push(conn);
}
}
connections.push(conn);
}
}
} else {
let stderr_text = String::from_utf8_lossy(&output.stderr);
error!("'ss -tupn' command failed with status {}. Stderr: {}", output.status, stderr_text);
error!(
"'ss -tupn' command failed with status {}. Stderr: {}",
output.status, stderr_text
);
// Proceeding, as netstat might provide data or this is a transient issue.
}
}
@@ -178,7 +192,10 @@ impl NetworkMonitor {
return Err(e.into()); // Propagate the error to stop further processing in get_platform_connections for this call
}
}
debug!("Finished processing 'ss' output. Current connections vec size: {}", connections.len());
debug!(
"Finished processing 'ss' output. Current connections vec size: {}",
connections.len()
);
Ok(())
}
@@ -192,77 +209,88 @@ impl NetworkMonitor {
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout);
let line_count = text.lines().count();
debug!("'netstat -tupn' command successful. Output lines: {}", line_count);
if line_count < 5 && line_count > 0 { // Log short output
debug!("'netstat -tupn' output (first {} lines):\n{}", line_count, text);
debug!(
"'netstat -tupn' command successful. Output lines: {}",
line_count
);
if line_count < 5 && line_count > 0 {
// Log short output
debug!(
"'netstat -tupn' output (first {} lines):\n{}",
line_count, text
);
} else if line_count == 0 {
debug!("'netstat -tupn' produced no output.");
}
for line in text.lines().skip(2) {
// Skip headers
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 5 {
continue;
}
for line in text.lines().skip(2) {
// Skip headers
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 5 {
continue;
}
// Parse protocol
let protocol = match fields[0].to_lowercase().as_str() {
"tcp" | "tcp6" => Protocol::TCP,
"udp" | "udp6" => Protocol::UDP,
_ => continue,
};
// Parse protocol
let protocol = match fields[0].to_lowercase().as_str() {
"tcp" | "tcp6" => Protocol::TCP,
"udp" | "udp6" => Protocol::UDP,
_ => continue,
};
// Parse state
let state_pos = 5;
let state = if fields.len() > state_pos {
match fields[state_pos] {
"ESTABLISHED" => ConnectionState::Established,
"LISTENING" | "LISTEN" => ConnectionState::Listen,
"TIME_WAIT" => ConnectionState::TimeWait,
"CLOSE_WAIT" => ConnectionState::CloseWait,
"SYN_SENT" => ConnectionState::SynSent,
"SYN_RECEIVED" | "SYN_RECV" => ConnectionState::SynReceived,
"FIN_WAIT_1" => ConnectionState::FinWait1,
"FIN_WAIT_2" => ConnectionState::FinWait2,
"LAST_ACK" => ConnectionState::LastAck,
"CLOSING" => ConnectionState::Closing,
_ => ConnectionState::Unknown,
}
} else {
ConnectionState::Unknown
};
// Parse state
let state_pos = 5;
let state = if fields.len() > state_pos {
match fields[state_pos] {
"ESTABLISHED" => ConnectionState::Established,
"LISTENING" | "LISTEN" => ConnectionState::Listen,
"TIME_WAIT" => ConnectionState::TimeWait,
"CLOSE_WAIT" => ConnectionState::CloseWait,
"SYN_SENT" => ConnectionState::SynSent,
"SYN_RECEIVED" | "SYN_RECV" => ConnectionState::SynReceived,
"FIN_WAIT_1" => ConnectionState::FinWait1,
"FIN_WAIT_2" => ConnectionState::FinWait2,
"LAST_ACK" => ConnectionState::LastAck,
"CLOSING" => ConnectionState::Closing,
_ => ConnectionState::Unknown,
}
} else {
ConnectionState::Unknown
};
// Parse local and remote addresses
let local_idx = 1;
let remote_idx = 2;
// Parse local and remote addresses
let local_idx = 1;
let remote_idx = 2;
if let (Some(local), Some(remote)) = (
self.parse_addr(fields[local_idx]),
self.parse_addr(fields[remote_idx]),
) {
let mut conn = Connection::new(protocol, local, remote, state);
if let (Some(local), Some(remote)) = (
self.parse_addr(fields[local_idx]),
self.parse_addr(fields[remote_idx]),
) {
let mut conn = Connection::new(protocol, local, remote, state);
// Parse PID and process name from "PID/Program name"
let pid_program_pos = 6; // Assuming this is the correct index for "PID/Program name"
if fields.len() > pid_program_pos && fields[pid_program_pos] != "-" {
let pid_str_parts: Vec<&str> = fields[pid_program_pos].split('/').collect();
if let Ok(pid) = pid_str_parts[0].parse::<u32>() {
conn.pid = Some(pid);
if pid_str_parts.len() > 1 {
// Check if the process name part is not empty or just "-"
if !pid_str_parts[1].is_empty() && pid_str_parts[1] != "-" {
conn.process_name = Some(pid_str_parts[1].to_string());
// Parse PID and process name from "PID/Program name"
let pid_program_pos = 6; // Assuming this is the correct index for "PID/Program name"
if fields.len() > pid_program_pos && fields[pid_program_pos] != "-" {
let pid_str_parts: Vec<&str> =
fields[pid_program_pos].split('/').collect();
if let Ok(pid) = pid_str_parts[0].parse::<u32>() {
conn.pid = Some(pid);
if pid_str_parts.len() > 1 {
// Check if the process name part is not empty or just "-"
if !pid_str_parts[1].is_empty() && pid_str_parts[1] != "-" {
conn.process_name = Some(pid_str_parts[1].to_string());
}
}
}
}
connections.push(conn);
}
}
connections.push(conn);
}
}
} else {
let stderr_text = String::from_utf8_lossy(&output.stderr);
error!("'netstat -tupn' command failed with status {}. Stderr: {}", output.status, stderr_text);
error!(
"'netstat -tupn' command failed with status {}. Stderr: {}",
output.status, stderr_text
);
}
}
Err(e) => {
@@ -270,7 +298,10 @@ impl NetworkMonitor {
return Err(e.into());
}
}
debug!("Finished processing 'netstat' output. Current connections vec size after netstat: {}", connections.len());
debug!(
"Finished processing 'netstat' output. Current connections vec size after netstat: {}",
connections.len()
);
Ok(())
}
@@ -326,6 +357,7 @@ fn try_ss_command(connection: &Connection) -> Option<Process> {
let proto_flag = match connection.protocol {
Protocol::TCP => "-t",
Protocol::UDP => "-u",
Protocol::ICMP => return None, // ss doesn't support ICMP directly
};
let local_port = connection.local_addr.port();
@@ -366,10 +398,7 @@ fn try_ss_command(connection: &Connection) -> Option<Process> {
format!("process-{}", pid)
};
return Some(Process {
pid,
name,
});
return Some(Process { pid, name });
}
}
}
@@ -411,6 +440,7 @@ fn try_netstat_command(connection: &Connection) -> Option<Process> {
fields[proto_idx].eq_ignore_ascii_case("udp")
|| fields[proto_idx].eq_ignore_ascii_case("udp6")
}
Protocol::ICMP => false, // netstat doesn't show ICMP connections directly
};
if matches_protocol
@@ -427,10 +457,7 @@ fn try_netstat_command(connection: &Connection) -> Option<Process> {
let name = get_process_name_by_pid(pid)
.unwrap_or_else(|| format!("process-{}", pid));
return Some(Process {
pid,
name,
});
return Some(Process { pid, name });
}
}
@@ -505,12 +532,8 @@ fn try_proc_parsing(connection: &Connection) -> Option<Process> {
.contains(&format!("socket:[{}]", inode))
{
// Found process with this socket
return get_process_name_by_pid(pid).map(
|name| Process {
pid,
name,
},
);
return get_process_name_by_pid(pid)
.map(|name| Process { pid, name });
}
}
}

View File

@@ -24,8 +24,7 @@ mod macos;
pub enum Protocol {
TCP,
UDP,
// ICMP, // Variant removed as unused
// Other(u8), // Variant removed as unused
ICMP,
}
impl std::fmt::Display for Protocol {
@@ -33,8 +32,7 @@ impl std::fmt::Display for Protocol {
match self {
Protocol::TCP => write!(f, "TCP"),
Protocol::UDP => write!(f, "UDP"),
// Protocol::ICMP => write!(f, "ICMP"), // Variant removed
// Protocol::Other(proto) => write!(f, "Proto({})", proto), // Variant removed
Protocol::ICMP => write!(f, "ICMP"),
}
}
}
@@ -48,12 +46,16 @@ pub enum ConnectionState {
FinWait1,
FinWait2,
TimeWait,
// Closed, // Variant removed as unused
Closed,
CloseWait,
LastAck,
Listen,
Closing,
Reset, // Added Reset variant
Reset,
IcmpEchoRequest,
IcmpEchoReply,
IcmpDestinationUnreachable,
IcmpTimeExceeded,
Unknown,
}
@@ -66,12 +68,16 @@ impl std::fmt::Display for ConnectionState {
ConnectionState::FinWait1 => write!(f, "FIN_WAIT_1"),
ConnectionState::FinWait2 => write!(f, "FIN_WAIT_2"),
ConnectionState::TimeWait => write!(f, "TIME_WAIT"),
// ConnectionState::Closed => write!(f, "CLOSED"), // Variant removed
ConnectionState::Closed => write!(f, "CLOSED"),
ConnectionState::CloseWait => write!(f, "CLOSE_WAIT"),
ConnectionState::LastAck => write!(f, "LAST_ACK"),
ConnectionState::Listen => write!(f, "LISTEN"),
ConnectionState::Closing => write!(f, "CLOSING"),
ConnectionState::Reset => write!(f, "RESET"),
ConnectionState::IcmpEchoRequest => write!(f, "ICMP_ECHO_REQUEST"),
ConnectionState::IcmpEchoReply => write!(f, "ICMP_ECHO_REPLY"),
ConnectionState::IcmpDestinationUnreachable => write!(f, "ICMP_DEST_UNREACH"),
ConnectionState::IcmpTimeExceeded => write!(f, "ICMP_TIME_EXCEEDED"),
ConnectionState::Unknown => write!(f, "UNKNOWN"),
}
}
@@ -93,17 +99,11 @@ pub struct Connection {
pub created_at: SystemTime,
pub last_activity: SystemTime,
pub service_name: Option<String>,
// Fields for current rate calculation
// pub prev_bytes_sent: u64, // Field removed as unused
// pub prev_bytes_received: u64, // Field removed as unused
// pub last_rate_update_time: Instant, // Field removed as unused
pub current_incoming_rate_bps: f64,
pub current_outgoing_rate_bps: f64,
pub rate_history: Vec<(Instant, u64, u64)>, // Stores (timestamp, total_bytes_sent, total_bytes_received)
}
// get_service_name_raw function is removed.
impl Connection {
/// Create a new connection
pub fn new(
@@ -113,7 +113,7 @@ impl Connection {
state: ConnectionState,
) -> Self {
let now = SystemTime::now();
let new_conn = Self { // Removed mut
let new_conn = Self {
protocol,
local_addr,
remote_addr,
@@ -127,10 +127,6 @@ impl Connection {
created_at: now,
last_activity: now,
service_name: None, // Service name will be set by NetworkMonitor
// Initialize new fields for rate calculation
// prev_bytes_sent: 0, // Field removed
// prev_bytes_received: 0, // Field removed
// last_rate_update_time: Instant::now(), // Field removed
current_incoming_rate_bps: 0.0,
current_outgoing_rate_bps: 0.0,
rate_history: Vec::new(),
@@ -163,26 +159,16 @@ impl Connection {
pub struct Process {
pub pid: u32,
pub name: String,
// pub command_line: Option<String>, // Field removed as unused
// pub user: Option<String>, // Field removed as unused
// pub cpu_usage: Option<f32>, // Field removed as unused
// pub memory_usage: Option<u64>, // Field removed as unused
}
// IP location information - struct removed as unused (dependent on get_ip_location)
/// Network monitor
pub struct NetworkMonitor {
interface: Option<String>,
capture: Option<Capture<pcap::Active>>,
connections: HashMap<String, Connection>,
// geo_db: Option<maxminddb::Reader<Vec<u8>>>, // Field removed as unused (dependent on get_ip_location)
service_lookup: ServiceLookup, // Added ServiceLookup
// collect_process_info: bool, // Removed, App will manage process info fetching
service_lookup: ServiceLookup,
filter_localhost: bool,
local_ips: std::collections::HashSet<IpAddr>,
// last_packet_check: Instant, // Removed for continuous processing by default
// initial_packet_processing_done: bool, // Removed
}
/// Manages lookup of service names from a services file.
@@ -198,8 +184,11 @@ impl ServiceLookup {
let file_path = Path::new(file_path_str);
if !file_path.exists() {
warn!("Service definition file not found at '{}'. Service names will not be available.", file_path_str);
return Ok(Self { services }); // Return empty lookup if file not found
warn!(
"Service definition file not found at '{}'. Service names will not be available.",
file_path_str
);
return Ok(Self { services });
}
let file = File::open(file_path)?;
@@ -232,14 +221,20 @@ impl ServiceLookup {
// Split port/protocol
let port_protocol_parts: Vec<&str> = port_protocol_str.split('/').collect();
if port_protocol_parts.len() != 2 {
debug!("Skipping malformed port/protocol in services file: {} from line: {}", port_protocol_str, line);
debug!(
"Skipping malformed port/protocol in services file: {} from line: {}",
port_protocol_str, line
);
continue;
}
let port = match port_protocol_parts[0].parse::<u16>() {
Ok(p) => p,
Err(_) => {
debug!("Skipping invalid port in services file: {} from line: {}", port_protocol_parts[0], line);
debug!(
"Skipping invalid port in services file: {} from line: {}",
port_protocol_parts[0], line
);
continue;
}
};
@@ -249,7 +244,10 @@ impl ServiceLookup {
"tcp" => Protocol::TCP,
"udp" => Protocol::UDP,
_ => {
debug!("Skipping unknown protocol in services file: {} from line: {}", protocol_str, line);
debug!(
"Skipping unknown protocol in services file: {} from line: {}",
protocol_str, line
);
continue;
}
};
@@ -258,7 +256,11 @@ impl ServiceLookup {
// If a port/protocol combo is already defined, the first one encountered wins.
services.entry((port, protocol)).or_insert(service_name);
}
debug!("ServiceLookup initialized with {} entries from '{}'", services.len(), file_path_str);
debug!(
"ServiceLookup initialized with {} entries from '{}'",
services.len(),
file_path_str
);
Ok(Self { services })
}
@@ -271,7 +273,10 @@ impl ServiceLookup {
/// Sets the service name for a given connection based on its port and protocol.
/// This function encapsulates the logic for choosing which port (local or remote)
/// determines the service.
fn set_connection_service_name_for_connection(conn: &mut Connection, service_lookup: &ServiceLookup) {
fn set_connection_service_name_for_connection(
conn: &mut Connection,
service_lookup: &ServiceLookup,
) {
let local_port = conn.local_addr.port();
let remote_port = conn.remote_addr.port();
let protocol = conn.protocol;
@@ -282,24 +287,15 @@ fn set_connection_service_name_for_connection(conn: &mut Connection, service_loo
// For listening sockets, the service is always on the local port
final_service_name = service_lookup.get(local_port, protocol);
} else {
// For other states, check if local port is a well-known service port
// and has a known service name.
let local_service_name_opt = service_lookup.get(local_port, protocol);
let local_is_well_known_port = local_port <= 1023; // Standard service port range
if local_is_well_known_port && local_service_name_opt.is_some() {
if local_service_name_opt.is_some() {
final_service_name = local_service_name_opt;
} else {
// If local port is not a well-known service, check the remote port.
let remote_service_name_opt = service_lookup.get(remote_port, protocol);
let remote_is_well_known_port = remote_port <= 1023;
if remote_is_well_known_port && remote_service_name_opt.is_some() {
if remote_service_name_opt.is_some() {
final_service_name = remote_service_name_opt;
}
// If neither are "well-known services" on standard ports with known names,
// the service name remains None, matching the original logic's strictness.
// More sophisticated heuristics (e.g. for non-standard ports) could be added here if desired.
}
}
conn.service_name = final_service_name;
@@ -311,7 +307,10 @@ impl NetworkMonitor {
log::info!("NetworkMonitor::new - Initializing");
let mut capture = if let Some(iface) = &interface {
// Open capture on specific interface
log::info!("NetworkMonitor::new - Listing devices for specific interface: {}", iface);
log::info!(
"NetworkMonitor::new - Listing devices for specific interface: {}",
iface
);
let device_list = Device::list()?;
log::info!("NetworkMonitor::new - Device list obtained");
let device = device_list
@@ -326,13 +325,19 @@ impl NetworkMonitor {
.snaplen(65535)
.promisc(true)
.open()?;
log::info!("NetworkMonitor::new - Capture opened on interface: {}", device.name);
log::info!(
"NetworkMonitor::new - Capture opened on interface: {}",
device.name
);
Some(cap)
} else {
// Get default interface if none specified
log::info!("NetworkMonitor::new - Looking up default device");
let device = Device::lookup()?.ok_or_else(|| anyhow!("No default device found"))?;
log::info!("NetworkMonitor::new - Default device found: {}", device.name);
log::info!(
"NetworkMonitor::new - Default device found: {}",
device.name
);
info!("Opening capture on default interface: {}", device.name);
let cap = Capture::from_device(device.clone())? // Clone device for logging
@@ -341,36 +346,30 @@ impl NetworkMonitor {
.snaplen(65535)
.promisc(true)
.open()?;
log::info!("NetworkMonitor::new - Capture opened on default interface: {}", device.name);
log::info!(
"NetworkMonitor::new - Capture opened on default interface: {}",
device.name
);
Some(cap)
};
// Set BPF filter to capture all TCP and UDP traffic
// Set BPF filter to capture all TCP, UDP and ICMP traffic
if let Some(ref mut cap) = capture {
log::info!("NetworkMonitor::new - Applying BPF filter 'tcp or udp'");
match cap.filter("tcp or udp", true) {
Ok(_) => info!("NetworkMonitor::new - Applied packet filter: tcp or udp"),
log::info!("NetworkMonitor::new - Applying BPF filter 'tcp or udp or icmp'");
match cap.filter("tcp or udp or icmp", true) {
Ok(_) => info!("NetworkMonitor::new - Applied packet filter: tcp or udp or icmp"),
Err(e) => error!("NetworkMonitor::new - Error setting packet filter: {}", e),
}
}
// Try to load MaxMind database if available - logic removed as geo_db field is removed
// let geo_db = std::fs::read("GeoLite2-City.mmdb")
// .ok()
// .map(|data| maxminddb::Reader::from_source(data).ok())
// .flatten();
// if geo_db.is_some() {
// info!("Loaded MaxMind GeoIP database");
// } else {
// debug!("MaxMind GeoIP database not found");
// }
// Get all local IP addresses
log::info!("NetworkMonitor::new - Getting local IP addresses using pnet_datalink");
let mut local_ips = std::collections::HashSet::new();
let pnet_interfaces = pnet_datalink::interfaces();
log::info!("NetworkMonitor::new - pnet_datalink::interfaces() returned {} interfaces", pnet_interfaces.len());
log::info!(
"NetworkMonitor::new - pnet_datalink::interfaces() returned {} interfaces",
pnet_interfaces.len()
);
for iface in pnet_interfaces {
for ip_network in iface.ips {
local_ips.insert(ip_network.ip());
@@ -384,40 +383,36 @@ impl NetworkMonitor {
}
// Initialize ServiceLookup
// TODO: Consider making the path configurable, e.g., via Config struct or environment variable.
let services_file_path = "assets/services";
log::info!("NetworkMonitor::new - Attempting to load service definitions from: {}", services_file_path);
log::info!(
"NetworkMonitor::new - Attempting to load service definitions from: {}",
services_file_path
);
let service_lookup = match ServiceLookup::new(services_file_path) {
Ok(sl) => sl,
Err(e) => {
error!("NetworkMonitor::new - Failed to load service definitions from '{}': {}. Proceeding without service names.", services_file_path, e);
// Fallback to an empty ServiceLookup if loading fails
ServiceLookup { services: HashMap::new() }
ServiceLookup {
services: HashMap::new(),
}
}
};
log::info!("NetworkMonitor::new - Initialization complete");
Ok(Self {
interface,
capture,
local_ips,
service_lookup, // Added service_lookup
service_lookup,
connections: HashMap::new(),
// geo_db, // Field removed
// collect_process_info: false, // Removed
filter_localhost,
// last_packet_check and initial_packet_processing_done removed
})
}
// set_collect_process_info method removed
/// Get active connections
pub fn get_connections(&mut self) -> Result<Vec<Connection>> {
log::debug!("NetworkMonitor::get_connections - Starting to fetch connections (without packet processing)");
// Packet processing is now handled externally.
// Get connections from system methods (ss, netstat)
let mut platform_conns_vec = Vec::new();
log::debug!("NetworkMonitor::get_connections - Attempting to populate platform_conns_vec via get_platform_connections.");
@@ -425,7 +420,6 @@ impl NetworkMonitor {
Ok(_) => log::debug!("NetworkMonitor::get_connections - get_platform_connections call completed. platform_conns_vec now has {} entries.", platform_conns_vec.len()),
Err(e) => {
log::error!("NetworkMonitor::get_connections - Error from get_platform_connections: {}. platform_conns_vec might be empty or partially filled.", e);
// Continue with whatever platform_conns_vec contains.
}
}
if platform_conns_vec.is_empty() {
@@ -440,12 +434,18 @@ impl NetworkMonitor {
// These have the byte counts and rate_history.
for (key, packet_conn) in &self.connections {
// Consider active connections or those also seen by platform tools for merging
if packet_conn.is_active() || platform_conns_vec.iter().any(|pc| self.get_connection_key_for_merge(pc) == *key) {
merged_connections.insert(key.clone(), packet_conn.clone());
if packet_conn.is_active()
|| platform_conns_vec
.iter()
.any(|pc| self.get_connection_key_for_merge(pc) == *key)
{
merged_connections.insert(key.clone(), packet_conn.clone());
}
}
log::debug!("NetworkMonitor::get_connections - Initial merge map size from packet data: {}", merged_connections.len());
log::debug!(
"NetworkMonitor::get_connections - Initial merge map size from packet data: {}",
merged_connections.len()
);
// 2. Iterate through platform connections. If a match is found in merged_connections,
// update its state, PID, and process name. If not found, add it.
@@ -461,19 +461,20 @@ impl NetworkMonitor {
if platform_conn.process_name.is_some() {
existing_conn.process_name = platform_conn.process_name;
}
// Crucially, existing_conn.bytes_sent, .bytes_received, .packets_sent, .packets_received,
// and .rate_history (from packet capture) are preserved.
} else {
// Connection only found by platform tools, add it.
// It will have 0 byte/packet counts and empty rate_history initially,
// as these are primarily populated by packet capture.
log::debug!("NetworkMonitor::get_connections - Adding new connection from platform data: {:?}", platform_conn);
merged_connections.insert(key, platform_conn);
}
}
log::debug!("NetworkMonitor::get_connections - Merge map size after platform data: {}", merged_connections.len());
log::debug!(
"NetworkMonitor::get_connections - Merge map size after platform data: {}",
merged_connections.len()
);
let mut result_connections: Vec<Connection> = merged_connections.into_values().collect();
// For connections that might still lack PID/process name (e.g., purely from packets
// and platform lookup failed before, or purely from platform and it didn't have it),
// try one more pass of platform-specific PID resolution.
@@ -501,252 +502,351 @@ impl NetworkMonitor {
set_connection_service_name_for_connection(conn, &self.service_lookup);
}
log::info!("NetworkMonitor::get_connections - Finished fetching connections. Total: {}", result_connections.len());
log::info!(
"NetworkMonitor::get_connections - Finished fetching connections. Total: {}",
result_connections.len()
);
Ok(result_connections)
}
// Moved set_connection_service_name to be a free function to avoid borrow checker issues in process_packets.
fn determine_addresses(
&self,
src_ip: IpAddr,
src_port: u16,
dst_ip: IpAddr,
dst_port: u16,
is_outgoing: bool,
) -> (SocketAddr, SocketAddr) {
if is_outgoing {
(
SocketAddr::new(src_ip, src_port),
SocketAddr::new(dst_ip, dst_port),
)
} else {
(SocketAddr::new(dst_ip, 0), SocketAddr::new(src_ip, 0))
}
}
/// Process packets from capture
pub fn process_packets(&mut self) -> Result<()> {
log::debug!("NetworkMonitor::process_packets - Entered process_packets");
// Define a helper function to process a single packet
// This avoids some borrowing issues with self.local_ips if it were passed directly
// Instead, we pass the HashMap, the local_ips set, and the service_lookup.
let process_single_packet = |data: &[u8],
monitor_connections: &mut HashMap<String, Connection>,
local_ips_set: &std::collections::HashSet<IpAddr>,
_interface: &Option<String>,
service_lookup: &ServiceLookup| { // Added service_lookup
// Check if it's an ethernet frame
if data.len() < 14 {
return; // Too short for Ethernet
}
// We need to break this up into a separate helper function that uses owned data to avoid Rust's borrowing issues
self.process_packets_impl()
}
// Skip Ethernet header (14 bytes) to get to IP header
let ip_data = &data[14..];
/// Implementation helper for process_packets to avoid borrowing issues
fn process_packets_impl(&mut self) -> Result<()> {
// First, gather all the packets into a Vec to avoid borrowing issues
if self.capture.is_none() {
log::warn!("NetworkMonitor::process_packets_impl - No capture device available.");
return Ok(());
}
// Make sure we have enough data for an IP header
if ip_data.len() < 20 {
return; // Too short for IP
}
let mut packets_to_process = Vec::new();
let loop_start_time = Instant::now();
// Check if it's IPv4
let version_ihl = ip_data[0];
let version = version_ihl >> 4;
if version != 4 {
return; // Not IPv4
}
// Process a moderate number of packets per call to balance responsiveness and data capture
const MAX_PACKETS_PER_CALL: usize = 100;
// Extract protocol (TCP=6, UDP=17)
let protocol = ip_data[9];
// Extract source and destination IP
let src_ip = IpAddr::from([ip_data[12], ip_data[13], ip_data[14], ip_data[15]]);
let dst_ip = IpAddr::from([ip_data[16], ip_data[17], ip_data[18], ip_data[19]]);
// Calculate IP header length
let ihl = version_ihl & 0x0F;
let ip_header_len = (ihl as usize) * 4;
// Skip to TCP/UDP header
let transport_data = &ip_data[ip_header_len..];
if transport_data.len() < 8 {
return; // Too short for TCP/UDP
}
// Determine if packet is outgoing based on IP address
let is_outgoing = local_ips_set.contains(&src_ip);
match protocol {
6 => {
// TCP
if transport_data.len() < 20 {
return; // Too short for TCP
}
// Extract ports
let src_port = ((transport_data[0] as u16) << 8) | transport_data[1] as u16;
let dst_port = ((transport_data[2] as u16) << 8) | transport_data[3] as u16;
// Extract TCP flags
let flags = transport_data[13];
// Determine connection state from flags
let state = match flags {
0x02 => ConnectionState::SynSent, // SYN
0x12 => ConnectionState::SynReceived, // SYN+ACK
0x10 => ConnectionState::Established, // ACK
0x01 => ConnectionState::FinWait1, // FIN
0x11 => ConnectionState::FinWait2, // FIN+ACK
0x04 => ConnectionState::Reset, // RST
0x14 => ConnectionState::Closing, // RST+ACK
_ => ConnectionState::Established, // Default to established
};
// Determine local and remote addresses
let (local_addr, remote_addr) = if is_outgoing {
(
SocketAddr::new(src_ip, src_port),
SocketAddr::new(dst_ip, dst_port),
)
} else {
(
SocketAddr::new(dst_ip, dst_port),
SocketAddr::new(src_ip, src_port),
)
};
// Create or update connection
let conn_protocol = Protocol::TCP; // Define protocol for key generation
let conn_key = format!(
"{:?}:{}-{:?}:{}",
conn_protocol, // Use actual protocol
local_addr,
conn_protocol, // Use actual protocol
remote_addr
);
if let Some(conn) = monitor_connections.get_mut(&conn_key) {
conn.last_activity = SystemTime::now();
if is_outgoing {
conn.packets_sent += 1;
conn.bytes_sent += data.len() as u64;
} else {
conn.packets_received += 1;
conn.bytes_received += data.len() as u64;
}
conn.state = state;
conn.rate_history.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, service_lookup);
} else {
let mut new_conn =
Connection::new(Protocol::TCP, local_addr, remote_addr, state);
new_conn.last_activity = SystemTime::now();
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
} else {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((Instant::now(), new_conn.bytes_sent, new_conn.bytes_received));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, service_lookup);
monitor_connections.insert(conn_key, new_conn);
}
}
17 => {
// UDP
// Extract ports
let src_port = ((transport_data[0] as u16) << 8) | transport_data[1] as u16;
let dst_port = ((transport_data[2] as u16) << 8) | transport_data[3] as u16;
// Determine local and remote addresses
let (local_addr, remote_addr) = if is_outgoing {
(
SocketAddr::new(src_ip, src_port),
SocketAddr::new(dst_ip, dst_port),
)
} else {
(
SocketAddr::new(dst_ip, dst_port),
SocketAddr::new(src_ip, src_port),
)
};
// Create or update connection
let conn_protocol = Protocol::UDP; // Define protocol for key generation
let conn_key = format!(
"{:?}:{}-{:?}:{}",
conn_protocol, // Use actual protocol
local_addr,
conn_protocol, // Use actual protocol
remote_addr
);
if let Some(conn) = monitor_connections.get_mut(&conn_key) {
conn.last_activity = SystemTime::now();
if is_outgoing {
conn.packets_sent += 1;
conn.bytes_sent += data.len() as u64;
} else {
conn.packets_received += 1;
conn.bytes_received += data.len() as u64;
}
conn.rate_history.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, service_lookup);
} else {
let mut new_conn = Connection::new(
Protocol::UDP,
local_addr,
remote_addr,
ConnectionState::Unknown,
);
new_conn.last_activity = SystemTime::now();
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
} else {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((Instant::now(), new_conn.bytes_sent, new_conn.bytes_received));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, service_lookup);
monitor_connections.insert(conn_key, new_conn);
}
}
_ => {} // Ignore other protocols
} // This closes the `match protocol`
}; // This closes the `process_single_packet` closure definition
// Removed initial_packet_processing_done logic and last_packet_check cooldown.
// The loop will now attempt to process packets more continuously,
// controlled by the sleep interval in the app.rs background thread.
// Get packets from the capture
// Collect packets first to avoid borrowing issues
if let Some(ref mut cap) = self.capture {
log::debug!("NetworkMonitor::process_packets - Starting packet processing loop (up to {} iterations)", MAX_PACKETS_PER_CALL);
let loop_start_time = Instant::now();
let mut packets_processed_in_loop = 0;
// Process a moderate number of packets per call to balance responsiveness and data capture
const MAX_PACKETS_PER_CALL: usize = 100; // Current value
log::debug!("NetworkMonitor::process_packets_impl - Starting packet collection loop");
for i in 0..MAX_PACKETS_PER_CALL {
match cap.next_packet() {
Ok(packet) => {
packets_processed_in_loop += 1;
// Use the local helper function to avoid borrowing issues
process_single_packet(packet.data, &mut self.connections, &self.local_ips, &self.interface, &self.service_lookup);
// Clone the packet data into our Vec
packets_to_process.push(packet.data.to_vec());
}
Err(pcap::Error::TimeoutExpired) => {
// This is expected if timeout(0) is working and no packets are available
log::trace!("NetworkMonitor::process_packets - cap.next_packet() timed out (iteration {})", i);
log::trace!("NetworkMonitor::process_packets_impl - cap.next_packet() timed out (iteration {})", i);
break; // No more packets for now, exit loop
}
Err(e) => {
error!("NetworkMonitor::process_packets - Error reading packet (iteration {}): {}", i, e);
error!("NetworkMonitor::process_packets_impl - Error reading packet (iteration {}): {}", i, e);
break; // Error reading packet
}
}
}
let loop_duration = loop_start_time.elapsed();
log::debug!(
"NetworkMonitor::process_packets - Packet processing loop finished in {:?}. Packets processed: {}/{} iterations.",
loop_duration,
packets_processed_in_loop,
MAX_PACKETS_PER_CALL
);
} else {
log::warn!("NetworkMonitor::process_packets - No capture device available.");
}
log::debug!("NetworkMonitor::process_packets - Exiting process_packets");
let packets_count = packets_to_process.len();
log::debug!(
"NetworkMonitor::process_packets_impl - Collected {} packets to process",
packets_count
);
// Now process all the collected packets
for packet_data in packets_to_process {
self.process_single_packet(&packet_data);
}
let loop_duration = loop_start_time.elapsed();
log::debug!(
"NetworkMonitor::process_packets_impl - Packet processing finished in {:?}. Processed {} packets.",
loop_duration,
packets_count
);
Ok(())
}
/// Process a single packet
fn process_single_packet(&mut self, data: &[u8]) {
// Check if it's an ethernet frame
if data.len() < 14 {
return; // Too short for Ethernet
}
// Skip Ethernet header (14 bytes) to get to IP header
let ip_data = &data[14..];
// Make sure we have enough data for an IP header
if ip_data.len() < 20 {
return; // Too short for IP
}
// Check if it's IPv4
let version_ihl = ip_data[0];
let version = version_ihl >> 4;
if version != 4 {
return; // Not IPv4
}
// Extract protocol (TCP=6, UDP=17)
let protocol = ip_data[9];
// Extract source and destination IP
let src_ip = IpAddr::from([ip_data[12], ip_data[13], ip_data[14], ip_data[15]]);
let dst_ip = IpAddr::from([ip_data[16], ip_data[17], ip_data[18], ip_data[19]]);
// Calculate IP header length
let ihl = version_ihl & 0x0F;
let ip_header_len = (ihl as usize) * 4;
// Skip to TCP/UDP header
let transport_data = &ip_data[ip_header_len..];
if transport_data.len() < 8 {
return; // Too short for TCP/UDP
}
// Determine if packet is outgoing based on IP address
let is_outgoing = self.local_ips.contains(&src_ip);
match protocol {
1 => self.process_icmp_packet(data, is_outgoing, transport_data, src_ip, dst_ip),
6 => self.process_tcp_packet(data, is_outgoing, transport_data, src_ip, dst_ip),
17 => self.process_udp_packet(data, is_outgoing, transport_data, src_ip, dst_ip),
_ => {} // Ignore other protocols
}
}
/// Process an ICMP packet
fn process_icmp_packet(
&mut self,
data: &[u8],
is_outgoing: bool,
transport_data: &[u8],
src_ip: IpAddr,
dst_ip: IpAddr,
) {
// Extract ICMP type
let icmp_type = transport_data[0];
let state = match icmp_type {
8 => ConnectionState::IcmpEchoRequest,
0 => ConnectionState::IcmpEchoReply,
3 => ConnectionState::IcmpDestinationUnreachable,
11 => ConnectionState::IcmpTimeExceeded,
_ => ConnectionState::Unknown,
};
let (local_addr, remote_addr) = self.determine_addresses(src_ip, 0, dst_ip, 0, is_outgoing);
// Create or update connection
let conn_protocol = Protocol::ICMP;
let conn_key = format!(
"{:?}:{}-{:?}:{}",
conn_protocol, local_addr, conn_protocol, remote_addr
);
if let Some(conn) = self.connections.get_mut(&conn_key) {
conn.last_activity = SystemTime::now();
if is_outgoing {
conn.packets_sent += 1;
conn.bytes_sent += data.len() as u64;
} else {
conn.packets_received += 1;
conn.bytes_received += data.len() as u64;
}
conn.state = state;
conn.rate_history
.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, &self.service_lookup);
} else {
let mut new_conn = Connection::new(Protocol::ICMP, local_addr, remote_addr, state);
new_conn.last_activity = SystemTime::now();
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
} else {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((
Instant::now(),
new_conn.bytes_sent,
new_conn.bytes_received,
));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, &self.service_lookup);
self.connections.insert(conn_key, new_conn);
}
}
/// Process a TCP packet
fn process_tcp_packet(
&mut self,
data: &[u8],
is_outgoing: bool,
transport_data: &[u8],
src_ip: IpAddr,
dst_ip: IpAddr,
) {
if transport_data.len() < 20 {
return; // Too short for TCP
}
// Extract ports
let src_port = ((transport_data[0] as u16) << 8) | transport_data[1] as u16;
let dst_port = ((transport_data[2] as u16) << 8) | transport_data[3] as u16;
// Extract TCP flags
let flags = transport_data[13];
// Determine connection state from flags
let state = match flags {
0x02 => ConnectionState::SynSent, // SYN
0x12 => ConnectionState::SynReceived, // SYN+ACK
0x10 => ConnectionState::Established, // ACK
0x01 => ConnectionState::FinWait1, // FIN
0x11 => ConnectionState::FinWait2, // FIN+ACK
0x04 => ConnectionState::Reset, // RST
0x14 => ConnectionState::Closing, // RST+ACK
_ => ConnectionState::Established, // Default to established
};
// Determine local and remote addresses
let (local_addr, remote_addr) =
self.determine_addresses(src_ip, src_port, dst_ip, dst_port, is_outgoing);
// Create or update connection
let conn_protocol = Protocol::TCP;
let conn_key = format!(
"{:?}:{}-{:?}:{}",
conn_protocol, local_addr, conn_protocol, remote_addr
);
if let Some(conn) = self.connections.get_mut(&conn_key) {
conn.last_activity = SystemTime::now();
if is_outgoing {
conn.packets_sent += 1;
conn.bytes_sent += data.len() as u64;
} else {
conn.packets_received += 1;
conn.bytes_received += data.len() as u64;
}
conn.state = state;
conn.rate_history
.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, &self.service_lookup);
} else {
let mut new_conn = Connection::new(Protocol::TCP, local_addr, remote_addr, state);
new_conn.last_activity = SystemTime::now();
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
} else {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((
Instant::now(),
new_conn.bytes_sent,
new_conn.bytes_received,
));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, &self.service_lookup);
self.connections.insert(conn_key, new_conn);
}
}
/// Process a UDP packet
fn process_udp_packet(
&mut self,
data: &[u8],
is_outgoing: bool,
transport_data: &[u8],
src_ip: IpAddr,
dst_ip: IpAddr,
) {
// Extract ports
let src_port = ((transport_data[0] as u16) << 8) | transport_data[1] as u16;
let dst_port = ((transport_data[2] as u16) << 8) | transport_data[3] as u16;
// Determine local and remote addresses
let (local_addr, remote_addr) =
self.determine_addresses(src_ip, src_port, dst_ip, dst_port, is_outgoing);
// Create or update connection
let conn_protocol = Protocol::UDP;
let conn_key = format!(
"{:?}:{}-{:?}:{}",
conn_protocol, local_addr, conn_protocol, remote_addr
);
if let Some(conn) = self.connections.get_mut(&conn_key) {
conn.last_activity = SystemTime::now();
if is_outgoing {
conn.packets_sent += 1;
conn.bytes_sent += data.len() as u64;
} else {
conn.packets_received += 1;
conn.bytes_received += data.len() as u64;
}
conn.rate_history
.push((Instant::now(), conn.bytes_sent, conn.bytes_received));
// Update service name for existing connection
set_connection_service_name_for_connection(conn, &self.service_lookup);
} else {
let mut new_conn = Connection::new(
Protocol::UDP,
local_addr,
remote_addr,
ConnectionState::Unknown,
);
new_conn.last_activity = SystemTime::now();
if is_outgoing {
new_conn.packets_sent += 1;
new_conn.bytes_sent += data.len() as u64;
} else {
new_conn.packets_received += 1;
new_conn.bytes_received += data.len() as u64;
}
new_conn.rate_history.push((
Instant::now(),
new_conn.bytes_sent,
new_conn.bytes_received,
));
// Set service name for new connection before inserting
set_connection_service_name_for_connection(&mut new_conn, &self.service_lookup);
self.connections.insert(conn_key, new_conn);
}
}
/// We don't need this method anymore since packet processing is done inline
// fn process_packet(&mut self, packet: Packet) { ... }
@@ -826,12 +926,13 @@ impl NetworkMonitor {
let (host_str_candidate, port_str) = addr_str.split_at(last_colon_idx);
let port_str = &port_str[1..]; // Skip the colon
let host_str = if host_str_candidate.starts_with('[') && host_str_candidate.ends_with(']') {
// IPv6 like [::1]
&host_str_candidate[1..host_str_candidate.len() - 1]
} else {
host_str_candidate
};
let host_str =
if host_str_candidate.starts_with('[') && host_str_candidate.ends_with(']') {
// IPv6 like [::1]
&host_str_candidate[1..host_str_candidate.len() - 1]
} else {
host_str_candidate
};
if let Ok(ip_addr) = host_str.parse::<std::net::IpAddr>() {
let port_num = if port_str == "*" {