improve traffic monitoring

This commit is contained in:
Marco Cadetg
2025-08-29 10:44:33 +02:00
parent a51acfbe28
commit 4b97828a3f
3 changed files with 302 additions and 18 deletions

View File

@@ -270,6 +270,10 @@ pub fn create_connection_from_packet(parsed: &ParsedPacket, now: SystemTime) ->
conn.created_at = now;
conn.last_activity = now;
// Initialize the rate tracker with the initial byte counts
// This prevents incorrect delta calculation on the first update
conn.rate_tracker.initialize_with_counts(conn.bytes_sent, conn.bytes_received);
conn
}
@@ -631,6 +635,33 @@ mod tests {
assert_eq!(conn.packets_sent, 0);
}
#[test]
fn test_new_connection_rate_tracker_initialization() {
// Test that the rate tracker is properly initialized for new connections
let packet = create_test_packet(true, false);
let conn = create_connection_from_packet(&packet, SystemTime::now());
// The connection should have initial bytes
assert_eq!(conn.bytes_sent, 100);
assert_eq!(conn.bytes_received, 0);
// Now simulate merging another packet
let packet2 = create_test_packet(true, false);
let mut updated_conn = merge_packet_into_connection(conn, &packet2, SystemTime::now());
// Bytes should have increased
assert_eq!(updated_conn.bytes_sent, 200);
assert_eq!(updated_conn.bytes_received, 0);
// Update rates - this should not cause a huge spike
updated_conn.update_rates();
// The rate should be reasonable (not include the initial 100 bytes as a spike)
// Since we just added 100 bytes, the rate should be based on that delta
// not on the full 200 bytes
assert!(updated_conn.current_outgoing_rate_bps >= 0.0);
}
#[test]
fn test_tcp_state_transitions() {
// Test SYN -> SYN_SENT

View File

@@ -549,8 +549,9 @@ impl Default for RateInfo {
#[derive(Debug, Clone)]
struct RateSample {
timestamp: Instant,
bytes_sent: u64,
bytes_received: u64,
// Delta values since last sample
delta_sent: u64,
delta_received: u64,
}
#[derive(Debug, Clone)]
@@ -559,6 +560,9 @@ pub struct RateTracker {
window_duration: Duration,
last_update: Instant,
max_samples: usize,
// Keep track of last byte counts for delta calculation
last_bytes_sent: u64,
last_bytes_received: u64,
}
impl RateTracker {
@@ -572,20 +576,36 @@ impl RateTracker {
window_duration,
last_update: Instant::now(),
max_samples: 100, // Limit memory usage
last_bytes_sent: 0,
last_bytes_received: 0,
}
}
/// Initialize the tracker with initial byte counts
/// This should be called when creating a connection with existing bytes
pub fn initialize_with_counts(&mut self, bytes_sent: u64, bytes_received: u64) {
self.last_bytes_sent = bytes_sent;
self.last_bytes_received = bytes_received;
}
/// Update the rate tracker with new byte counts
pub fn update(&mut self, bytes_sent: u64, bytes_received: u64) {
let now = Instant::now();
// Add new sample
// Calculate deltas since last update
let delta_sent = bytes_sent.saturating_sub(self.last_bytes_sent);
let delta_received = bytes_received.saturating_sub(self.last_bytes_received);
// Add new sample with deltas
self.samples.push_back(RateSample {
timestamp: now,
bytes_sent,
bytes_received,
delta_sent,
delta_received,
});
// Update last values for next delta calculation
self.last_bytes_sent = bytes_sent;
self.last_bytes_received = bytes_received;
self.last_update = now;
// Remove samples outside the window
@@ -612,39 +632,70 @@ impl RateTracker {
/// Get the current incoming rate in bytes per second
pub fn get_incoming_rate_bps(&self) -> f64 {
self.calculate_rate(|sample| sample.bytes_received)
self.calculate_rate_from_deltas(|sample| sample.delta_received)
}
/// Get the current outgoing rate in bytes per second
pub fn get_outgoing_rate_bps(&self) -> f64 {
self.calculate_rate(|sample| sample.bytes_sent)
self.calculate_rate_from_deltas(|sample| sample.delta_sent)
}
/// Calculate rate for a given byte field using the sliding window
fn calculate_rate<F>(&self, byte_getter: F) -> f64
/// Calculate rate using delta values for accurate sliding window calculation
fn calculate_rate_from_deltas<F>(&self, delta_getter: F) -> f64
where
F: Fn(&RateSample) -> u64,
{
if self.samples.len() < 2 {
if self.samples.is_empty() {
return 0.0;
}
// If we only have one sample, we can't calculate a rate yet
if self.samples.len() == 1 {
return 0.0;
}
let oldest = self.samples.front().unwrap();
let newest = self.samples.back().unwrap();
let time_diff = newest
// Calculate the time span of our samples
let time_span = newest
.timestamp
.duration_since(oldest.timestamp)
.as_secs_f64();
// Need at least 100ms of data to avoid division by very small numbers
if time_diff < 0.1 {
if time_span < 0.1 {
return 0.0;
}
let byte_diff = byte_getter(newest).saturating_sub(byte_getter(oldest)) as f64;
// Sum all deltas in the window (skip the first sample as it might have incomplete delta)
let total_bytes: u64 = self.samples
.iter()
.skip(1) // Skip first sample which might have delta from before window
.map(delta_getter)
.sum();
byte_diff / time_diff
// Calculate base rate
let base_rate = total_bytes as f64 / time_span;
// Apply time-based decay more gently, similar to iftop's approach
let now = Instant::now();
let time_since_last_sample = now.duration_since(newest.timestamp).as_secs_f64();
// More gentle decay - start decay after 3 seconds, fully decay by 10 seconds
if time_since_last_sample > 10.0 {
// After 10 seconds of no traffic, rate should be very close to zero
0.0
} else if time_since_last_sample > 3.0 {
// Exponential decay from 3 to 10 seconds
// This creates a more natural falloff similar to iftop
let decay_time = time_since_last_sample - 3.0; // 0 to 7 seconds
let decay_factor = (-decay_time / 3.0).exp(); // Exponential decay
base_rate * decay_factor
} else {
// No decay for first 3 seconds - show full rate
base_rate
}
}
/// Get the age of the oldest sample in the current window
@@ -867,6 +918,23 @@ impl Connection {
};
}
/// Refresh rates without adding new data - useful for idle connections
/// This ensures rates decay to zero when no traffic is flowing
pub fn refresh_rates(&mut self) {
// Just recalculate rates based on current time
// The calculate_rate_from_deltas method now checks sample age
self.current_incoming_rate_bps = self.rate_tracker.get_incoming_rate_bps();
self.current_outgoing_rate_bps = self.rate_tracker.get_outgoing_rate_bps();
// Also update the legacy RateInfo struct
let now = Instant::now();
self.current_rate_bps = RateInfo {
incoming_bps: self.current_incoming_rate_bps,
outgoing_bps: self.current_outgoing_rate_bps,
last_calculation: now,
};
}
/// Get dynamic timeout for this connection based on protocol and state
pub fn get_timeout(&self) -> Duration {
match &self.protocol_state {
@@ -982,11 +1050,11 @@ mod tests {
fn test_rate_tracker_single_update() {
let mut tracker = RateTracker::new();
// First update - no rate yet (need at least 2 samples)
// First update establishes baseline
tracker.update(1000, 500);
assert_eq!(tracker.get_incoming_rate_bps(), 0.0);
assert_eq!(tracker.get_outgoing_rate_bps(), 0.0);
// Test single update
// Test single update - need at least 2 samples for rate
}
#[test]
@@ -1160,6 +1228,161 @@ mod tests {
assert_eq!(tracker.get_incoming_rate_bps(), 0.0);
}
#[test]
fn test_rate_tracker_cumulative_fix() {
// This test verifies the fix for the cumulative byte count issue
let mut tracker = RateTracker::new();
// Simulate a connection that has been running for a while
// with cumulative byte counts
tracker.update(1_000_000, 500_000); // 1MB sent, 500KB received total
thread::sleep(Duration::from_millis(100));
tracker.update(1_100_000, 550_000); // 100KB more sent, 50KB more received
thread::sleep(Duration::from_millis(100));
tracker.update(1_200_000, 600_000); // 100KB more sent, 50KB more received
// The rate should be based on the deltas, not the cumulative values
// We sent 200KB in ~200ms = ~1MB/s, received 100KB in ~200ms = ~500KB/s
let outgoing_rate = tracker.get_outgoing_rate_bps();
let incoming_rate = tracker.get_incoming_rate_bps();
// Should be approximately 1MB/s outgoing (1_000_000 bytes/sec)
assert!(
outgoing_rate > 800_000.0 && outgoing_rate < 1_200_000.0,
"Outgoing rate should be ~1MB/s, got: {}",
outgoing_rate
);
// Should be approximately 500KB/s incoming (500_000 bytes/sec)
assert!(
incoming_rate > 400_000.0 && incoming_rate < 600_000.0,
"Incoming rate should be ~500KB/s, got: {}",
incoming_rate
);
}
#[test]
fn test_rate_tracker_window_sliding() {
// Test that rates are calculated correctly as the window slides
let window_duration = Duration::from_millis(500);
let mut tracker = RateTracker::with_window_duration(window_duration);
// Add initial samples
tracker.update(0, 0);
thread::sleep(Duration::from_millis(100));
tracker.update(100_000, 50_000); // 100KB sent, 50KB received
thread::sleep(Duration::from_millis(100));
tracker.update(200_000, 100_000); // Another 100KB sent, 50KB received
// Wait for window to slide past first samples
thread::sleep(Duration::from_millis(600));
// Add new samples with same rate
tracker.update(300_000, 150_000); // Another 100KB sent, 50KB received
thread::sleep(Duration::from_millis(100));
tracker.update(400_000, 200_000); // Another 100KB sent, 50KB received
// Rate should still be consistent despite window sliding
let outgoing_rate = tracker.get_outgoing_rate_bps();
let incoming_rate = tracker.get_incoming_rate_bps();
// We're sending at ~1MB/s and receiving at ~500KB/s consistently
assert!(
outgoing_rate > 800_000.0 && outgoing_rate < 1_200_000.0,
"Outgoing rate after window slide: {}",
outgoing_rate
);
assert!(
incoming_rate > 400_000.0 && incoming_rate < 600_000.0,
"Incoming rate after window slide: {}",
incoming_rate
);
}
#[test]
fn test_rate_decay_for_idle_connections() {
// Test that rates decay to zero when connections become idle
let mut tracker = RateTracker::new();
// Simulate active traffic
tracker.update(0, 0);
thread::sleep(Duration::from_millis(100));
tracker.update(100_000, 50_000); // 100KB sent, 50KB received
// Should have non-zero rate immediately after traffic
let initial_out = tracker.get_outgoing_rate_bps();
let initial_in = tracker.get_incoming_rate_bps();
assert!(initial_out > 0.0, "Should have outgoing traffic");
assert!(initial_in > 0.0, "Should have incoming traffic");
// Wait 2 seconds (should still show full rate - no decay yet)
thread::sleep(Duration::from_millis(2000));
let still_active_out = tracker.get_outgoing_rate_bps();
let still_active_in = tracker.get_incoming_rate_bps();
// Rates should still be the same (no decay for first 3 seconds)
assert_eq!(still_active_out, initial_out, "Should not decay within 3 seconds");
assert_eq!(still_active_in, initial_in, "Should not decay within 3 seconds");
// Wait until decay starts (total 4 seconds - should start decay)
thread::sleep(Duration::from_millis(2000));
let decayed_out = tracker.get_outgoing_rate_bps();
let decayed_in = tracker.get_incoming_rate_bps();
// Rates should be lower due to decay
assert!(decayed_out < initial_out, "Outgoing rate should start decaying after 3s");
assert!(decayed_in < initial_in, "Incoming rate should start decaying after 3s");
assert!(decayed_out > 0.0, "Should still have some rate at 4s");
// Wait for full decay (total 11 seconds - should be zero)
thread::sleep(Duration::from_millis(7000));
let final_out = tracker.get_outgoing_rate_bps();
let final_in = tracker.get_incoming_rate_bps();
// After 10+ seconds of idle, rates should be zero
assert_eq!(final_out, 0.0, "Outgoing rate should be zero after 10+ seconds idle");
assert_eq!(final_in, 0.0, "Incoming rate should be zero after 10+ seconds idle");
}
#[test]
fn test_connection_refresh_rates() {
// Test that refresh_rates() properly updates cached rate values
let mut conn = create_test_connection();
// Initialize the rate tracker properly
conn.rate_tracker.initialize_with_counts(0, 0);
// Simulate first packet
conn.bytes_sent = 50_000;
conn.bytes_received = 25_000;
conn.update_rates();
thread::sleep(Duration::from_millis(100));
// Simulate more traffic
conn.bytes_sent = 100_000;
conn.bytes_received = 50_000;
conn.update_rates();
// Should have non-zero rates after recent traffic
assert!(conn.current_outgoing_rate_bps > 0.0, "Should have outgoing rate");
assert!(conn.current_incoming_rate_bps > 0.0, "Should have incoming rate");
// Now simulate longer idle time and refresh (need >10s for zero)
thread::sleep(Duration::from_millis(11000));
conn.refresh_rates();
// Rates should be zero after refresh with long idle connection
assert_eq!(conn.current_outgoing_rate_bps, 0.0, "Should be zero after 10+ seconds idle refresh");
assert_eq!(conn.current_incoming_rate_bps, 0.0, "Should be zero after 10+ seconds idle refresh");
}
#[test]
fn test_enhanced_state_display_tcp() {
let mut conn = create_test_connection();