From fb1cec982d3875638914ff5ec260e267aacc74ea Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 16 Sep 2024 12:50:53 +0300 Subject: [PATCH 1/2] fix: Do CC reaction before `largest_acked` Packets are only declared as lost relative to `largest_acked`. If we hit a PTO while we don't have a largest_acked yet, also do a congestion control reaction (because otherwise none would happen). Broken out of #1998 --- neqo-transport/src/cc/classic_cc.rs | 70 ++++++++++++++--------------- neqo-transport/src/cc/mod.rs | 5 +++ neqo-transport/src/path.rs | 12 +++++ neqo-transport/src/recovery/mod.rs | 17 +++++-- neqo-transport/src/sender.rs | 7 +++ 5 files changed, 73 insertions(+), 38 deletions(-) diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 1130178bc0..35990755e5 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -337,6 +337,41 @@ impl CongestionControl for ClassicCongestionControl { congestion || persistent_congestion } + /// Handle a congestion event. + /// Returns true if this was a true congestion event. + fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + // Start a new congestion event if lost or ECN CE marked packet was sent + // after the start of the previous congestion recovery period. + if !self.after_recovery_start(last_packet) { + return false; + } + + let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( + self.congestion_window, + self.acked_bytes, + self.max_datagram_size(), + ); + self.congestion_window = max(cwnd, self.cwnd_min()); + self.acked_bytes = acked_bytes; + self.ssthresh = self.congestion_window; + qdebug!( + [self], + "Cong event -> recovery; cwnd {}, ssthresh {}", + self.congestion_window, + self.ssthresh + ); + qlog::metrics_updated( + &self.qlog, + &[ + QlogMetric::CongestionWindow(self.congestion_window), + QlogMetric::SsThresh(self.ssthresh), + QlogMetric::InRecovery(true), + ], + ); + self.set_state(State::RecoveryStart); + true + } + /// Report received ECN CE mark(s) to the congestion controller as a /// congestion event. /// @@ -537,41 +572,6 @@ impl ClassicCongestionControl { !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn) } - /// Handle a congestion event. - /// Returns true if this was a true congestion event. - fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { - // Start a new congestion event if lost or ECN CE marked packet was sent - // after the start of the previous congestion recovery period. - if !self.after_recovery_start(last_packet) { - return false; - } - - let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( - self.congestion_window, - self.acked_bytes, - self.max_datagram_size(), - ); - self.congestion_window = max(cwnd, self.cwnd_min()); - self.acked_bytes = acked_bytes; - self.ssthresh = self.congestion_window; - qdebug!( - [self], - "Cong event -> recovery; cwnd {}, ssthresh {}", - self.congestion_window, - self.ssthresh - ); - qlog::metrics_updated( - &self.qlog, - &[ - QlogMetric::CongestionWindow(self.congestion_window), - QlogMetric::SsThresh(self.ssthresh), - QlogMetric::InRecovery(true), - ], - ); - self.set_state(State::RecoveryStart); - true - } - fn app_limited(&self) -> bool { if self.bytes_in_flight >= self.congestion_window { false diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index bbb47c4fd0..43eed5d26c 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -62,6 +62,11 @@ pub trait CongestionControl: Display + Debug { lost_packets: &[SentPacket], ) -> bool; + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool; + /// Returns true if the congestion window was reduced. fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool; diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 35d29f0253..80e85ec428 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -1040,6 +1040,18 @@ impl Path { } } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event(&mut self, lost_packets: &[SentPacket], stats: &mut Stats) -> bool { + if let Some(last) = lost_packets.last() { + self.ecn_info.on_packets_lost(lost_packets, stats); + self.sender.on_congestion_event(last) + } else { + false + } + } + /// Determine whether we should be setting a PTO for this path. This is true when either the /// path is valid or when there is enough remaining in the amplification limit to fit a /// full-sized path (i.e., the path MTU). diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index f74aa11337..06906e6220 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -827,12 +827,12 @@ impl LossRecovery { /// When it has, mark a few packets as "lost" for the purposes of having frames /// regenerated in subsequent packets. The packets aren't truly lost, so /// we have to clone the `SentPacket` instance. - fn maybe_fire_pto(&mut self, rtt: &RttEstimate, now: Instant, lost: &mut Vec) { + fn maybe_fire_pto(&mut self, path: &PathRef, now: Instant, lost: &mut Vec) { let mut pto_space = None; // The spaces in which we will allow probing. let mut allow_probes = PacketNumberSpaceSet::default(); for pn_space in PacketNumberSpace::iter() { - if let Some(t) = self.pto_time(rtt, *pn_space) { + if let Some(t) = self.pto_time(path.borrow().rtt(), *pn_space) { allow_probes[*pn_space] = true; if t <= now { qdebug!([self], "PTO timer fired for {}", pn_space); @@ -852,6 +852,17 @@ impl LossRecovery { // pto_time to increase which might cause PTO for later packet number spaces to not fire. if let Some(pn_space) = pto_space { qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes); + // Packets are only declared as lost relative to `largest_acked`. If we hit a PTO while + // we don't have a largest_acked yet, also do a congestion control reaction (because + // otherwise none would happen). + if self + .spaces + .get(pn_space) + .map_or(false, |space| space.largest_acked.is_none()) + { + path.borrow_mut() + .on_congestion_event(lost, &mut self.stats.borrow_mut()); + } self.fire_pto(pn_space, allow_probes); } } @@ -882,7 +893,7 @@ impl LossRecovery { } self.stats.borrow_mut().lost += lost_packets.len(); - self.maybe_fire_pto(primary_path.borrow().rtt(), now, &mut lost_packets); + self.maybe_fire_pto(primary_path, now, &mut lost_packets); lost_packets } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index a9ead627aa..20213cc11e 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -113,6 +113,13 @@ impl PacketSender { self.maybe_update_pacer_mtu(); } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + self.cc.on_congestion_event(last_packet) + } + /// Called when packets are lost. Returns true if the congestion window was reduced. pub fn on_packets_lost( &mut self, From 8ae64d46310077f211da77d5fd2de0b9b451d89c Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 25 Nov 2024 09:14:01 +0200 Subject: [PATCH 2/2] Fix merge --- neqo-transport/src/cc/classic_cc.rs | 5 +++-- neqo-transport/src/cc/mod.rs | 2 +- neqo-transport/src/path.rs | 9 +++++++-- neqo-transport/src/recovery/mod.rs | 4 ++-- neqo-transport/src/sender.rs | 4 ++-- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index f051662b25..e221810298 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -343,7 +343,7 @@ impl CongestionControl for ClassicCongestionControl { /// Handle a congestion event. /// Returns true if this was a true congestion event. - fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { // Start a new congestion event if lost or ECN CE marked packet was sent // after the start of the previous congestion recovery period. if !self.after_recovery_start(last_packet) { @@ -371,8 +371,9 @@ impl CongestionControl for ClassicCongestionControl { QlogMetric::SsThresh(self.ssthresh), QlogMetric::InRecovery(true), ], + now, ); - self.set_state(State::RecoveryStart); + self.set_state(State::RecoveryStart, now); true } diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index ab674fe978..15fa9e41cb 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -66,7 +66,7 @@ pub trait CongestionControl: Display + Debug { /// Initiate a congestion response. /// /// Returns true if the congestion window was reduced. - fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool; + fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool; /// Returns true if the congestion window was reduced. fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool; diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 609b6a0be6..a0f1aa46cc 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -999,10 +999,15 @@ impl Path { /// Initiate a congestion response. /// /// Returns true if the congestion window was reduced. - pub fn on_congestion_event(&mut self, lost_packets: &[SentPacket], stats: &mut Stats) -> bool { + pub fn on_congestion_event( + &mut self, + lost_packets: &[SentPacket], + stats: &mut Stats, + now: Instant, + ) -> bool { if let Some(last) = lost_packets.last() { self.ecn_info.on_packets_lost(lost_packets, stats); - self.sender.on_congestion_event(last) + self.sender.on_congestion_event(last, now) } else { false } diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index 184e22adf5..797fd35428 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -865,10 +865,10 @@ impl LossRecovery { if self .spaces .get(pn_space) - .map_or(false, |space| space.largest_acked.is_none()) + .is_some_and(|space| space.largest_acked.is_none()) { path.borrow_mut() - .on_congestion_event(lost, &mut self.stats.borrow_mut()); + .on_congestion_event(lost, &mut self.stats.borrow_mut(), now); } self.fire_pto(pn_space, allow_probes, now); } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index 541a72dca5..4009754fd9 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -116,8 +116,8 @@ impl PacketSender { /// Initiate a congestion response. /// /// Returns true if the congestion window was reduced. - pub fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { - self.cc.on_congestion_event(last_packet) + pub fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { + self.cc.on_congestion_event(last_packet, now) } /// Called when packets are lost. Returns true if the congestion window was reduced.