diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 0c629afb59..e221810298 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -341,6 +341,42 @@ impl CongestionControl for ClassicCongestionControl { congestion || persistent_congestion } + /// Handle a congestion event. + /// Returns true if this was a true congestion event. + fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { + // Start a new congestion event if lost or ECN CE marked packet was sent + // after the start of the previous congestion recovery period. + if !self.after_recovery_start(last_packet) { + return false; + } + + let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( + self.congestion_window, + self.acked_bytes, + self.max_datagram_size(), + ); + self.congestion_window = max(cwnd, self.cwnd_min()); + self.acked_bytes = acked_bytes; + self.ssthresh = self.congestion_window; + qdebug!( + [self], + "Cong event -> recovery; cwnd {}, ssthresh {}", + self.congestion_window, + self.ssthresh + ); + qlog::metrics_updated( + &self.qlog, + &[ + QlogMetric::CongestionWindow(self.congestion_window), + QlogMetric::SsThresh(self.ssthresh), + QlogMetric::InRecovery(true), + ], + now, + ); + self.set_state(State::RecoveryStart, now); + true + } + /// Report received ECN CE mark(s) to the congestion controller as a /// congestion event. /// @@ -549,42 +585,6 @@ impl ClassicCongestionControl { !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn) } - /// Handle a congestion event. - /// Returns true if this was a true congestion event. - fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { - // Start a new congestion event if lost or ECN CE marked packet was sent - // after the start of the previous congestion recovery period. - if !self.after_recovery_start(last_packet) { - return false; - } - - let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( - self.congestion_window, - self.acked_bytes, - self.max_datagram_size(), - ); - self.congestion_window = max(cwnd, self.cwnd_min()); - self.acked_bytes = acked_bytes; - self.ssthresh = self.congestion_window; - qdebug!( - [self], - "Cong event -> recovery; cwnd {}, ssthresh {}", - self.congestion_window, - self.ssthresh - ); - qlog::metrics_updated( - &self.qlog, - &[ - QlogMetric::CongestionWindow(self.congestion_window), - QlogMetric::SsThresh(self.ssthresh), - QlogMetric::InRecovery(true), - ], - now, - ); - self.set_state(State::RecoveryStart, now); - true - } - fn app_limited(&self) -> bool { if self.bytes_in_flight >= self.congestion_window { false diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index 6c862c146f..15fa9e41cb 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -63,6 +63,11 @@ pub trait CongestionControl: Display + Debug { now: Instant, ) -> bool; + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool; + /// Returns true if the congestion window was reduced. fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool; diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 7d0aaf62a6..a0f1aa46cc 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -996,6 +996,23 @@ impl Path { } } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event( + &mut self, + lost_packets: &[SentPacket], + stats: &mut Stats, + now: Instant, + ) -> bool { + if let Some(last) = lost_packets.last() { + self.ecn_info.on_packets_lost(lost_packets, stats); + self.sender.on_congestion_event(last, now) + } else { + false + } + } + /// Determine whether we should be setting a PTO for this path. This is true when either the /// path is valid or when there is enough remaining in the amplification limit to fit a /// full-sized path (i.e., the path MTU). diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index 8f89b1cfcd..3ae3ee7c29 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -864,12 +864,12 @@ impl LossRecovery { /// When it has, mark a few packets as "lost" for the purposes of having frames /// regenerated in subsequent packets. The packets aren't truly lost, so /// we have to clone the `SentPacket` instance. - fn maybe_fire_pto(&mut self, rtt: &RttEstimate, now: Instant, lost: &mut Vec) { + fn maybe_fire_pto(&mut self, path: &PathRef, now: Instant, lost: &mut Vec) { let mut pto_space = None; // The spaces in which we will allow probing. let mut allow_probes = PacketNumberSpaceSet::default(); for pn_space in PacketNumberSpace::iter() { - if let Some(t) = self.pto_time(rtt, *pn_space) { + if let Some(t) = self.pto_time(path.borrow().rtt(), *pn_space) { allow_probes[*pn_space] = true; if t <= now { qdebug!([self], "PTO timer fired for {}", pn_space); @@ -889,6 +889,17 @@ impl LossRecovery { // pto_time to increase which might cause PTO for later packet number spaces to not fire. if let Some(pn_space) = pto_space { qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes); + // Packets are only declared as lost relative to `largest_acked`. If we hit a PTO while + // we don't have a largest_acked yet, also do a congestion control reaction (because + // otherwise none would happen). + if self + .spaces + .get(pn_space) + .is_some_and(|space| space.largest_acked.is_none()) + { + path.borrow_mut() + .on_congestion_event(lost, &mut self.stats.borrow_mut(), now); + } self.fire_pto(pn_space, allow_probes, now); } } @@ -920,7 +931,7 @@ impl LossRecovery { } self.stats.borrow_mut().lost += lost_packets.len(); - self.maybe_fire_pto(primary_path.borrow().rtt(), now, &mut lost_packets); + self.maybe_fire_pto(primary_path, now, &mut lost_packets); lost_packets } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index e6075ee327..4009754fd9 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -113,6 +113,13 @@ impl PacketSender { self.maybe_update_pacer_mtu(); } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { + self.cc.on_congestion_event(last_packet, now) + } + /// Called when packets are lost. Returns true if the congestion window was reduced. pub fn on_packets_lost( &mut self,