Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Do CC reaction before largest_acked #2117

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,42 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
congestion || persistent_congestion
}

/// Handle a congestion event.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a move? It looks like it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but from ClassicCongestionControl to CongestionControl.

/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
return false;
}

let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd(
self.congestion_window,
self.acked_bytes,
self.max_datagram_size(),
);
self.congestion_window = max(cwnd, self.cwnd_min());
self.acked_bytes = acked_bytes;
self.ssthresh = self.congestion_window;
qdebug!(
[self],
"Cong event -> recovery; cwnd {}, ssthresh {}",
self.congestion_window,
self.ssthresh
);
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
now,
);
self.set_state(State::RecoveryStart, now);
true
}

/// Report received ECN CE mark(s) to the congestion controller as a
/// congestion event.
///
Expand Down Expand Up @@ -549,42 +585,6 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
!self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn)
}

/// Handle a congestion event.
/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
return false;
}

let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd(
self.congestion_window,
self.acked_bytes,
self.max_datagram_size(),
);
self.congestion_window = max(cwnd, self.cwnd_min());
self.acked_bytes = acked_bytes;
self.ssthresh = self.congestion_window;
qdebug!(
[self],
"Cong event -> recovery; cwnd {}, ssthresh {}",
self.congestion_window,
self.ssthresh
);
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
now,
);
self.set_state(State::RecoveryStart, now);
true
}

fn app_limited(&self) -> bool {
if self.bytes_in_flight >= self.congestion_window {
false
Expand Down
5 changes: 5 additions & 0 deletions neqo-transport/src/cc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub trait CongestionControl: Display + Debug {
now: Instant,
) -> bool;

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool;

/// Returns true if the congestion window was reduced.
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool;

Expand Down
17 changes: 17 additions & 0 deletions neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,23 @@
}
}

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
pub fn on_congestion_event(
&mut self,
lost_packets: &[SentPacket],
stats: &mut Stats,
now: Instant,
) -> bool {
if let Some(last) = lost_packets.last() {
self.ecn_info.on_packets_lost(lost_packets, stats);
self.sender.on_congestion_event(last, now)
} else {
false

Check warning on line 1012 in neqo-transport/src/path.rs

View check run for this annotation

Codecov / codecov/patch

neqo-transport/src/path.rs#L1012

Added line #L1012 was not covered by tests
}
}

/// Determine whether we should be setting a PTO for this path. This is true when either the
/// path is valid or when there is enough remaining in the amplification limit to fit a
/// full-sized path (i.e., the path MTU).
Expand Down
17 changes: 14 additions & 3 deletions neqo-transport/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,12 +864,12 @@ impl LossRecovery {
/// When it has, mark a few packets as "lost" for the purposes of having frames
/// regenerated in subsequent packets. The packets aren't truly lost, so
/// we have to clone the `SentPacket` instance.
fn maybe_fire_pto(&mut self, rtt: &RttEstimate, now: Instant, lost: &mut Vec<SentPacket>) {
fn maybe_fire_pto(&mut self, path: &PathRef, now: Instant, lost: &mut Vec<SentPacket>) {
let mut pto_space = None;
// The spaces in which we will allow probing.
let mut allow_probes = PacketNumberSpaceSet::default();
for pn_space in PacketNumberSpace::iter() {
if let Some(t) = self.pto_time(rtt, *pn_space) {
if let Some(t) = self.pto_time(path.borrow().rtt(), *pn_space) {
allow_probes[*pn_space] = true;
if t <= now {
qdebug!([self], "PTO timer fired for {}", pn_space);
Expand All @@ -889,6 +889,17 @@ impl LossRecovery {
// pto_time to increase which might cause PTO for later packet number spaces to not fire.
if let Some(pn_space) = pto_space {
qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes);
// Packets are only declared as lost relative to `largest_acked`. If we hit a PTO while
// we don't have a largest_acked yet, also do a congestion control reaction (because
// otherwise none would happen).
Comment on lines +892 to +894
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that we want to slow down? If we haven't received an ACK, we should be at a low send rate. Do we really want to slow down further?

One thing that concerns me here is that a fast PTO (if we ever enable that) will hit this condition more often and that might not be good for performance. The same goes for long RTT connections, which might be OK while we are still retransmitting within the RTT, but once we get an RTT estimate that is long, we'll slow our send rate by a huge amount for the false PTOs we've hit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the argument is that the combination of initial cwnd and initial RTT (= PTO) were made with some sort of safety criteria in mind. A PTO is an indication that either the RTT is much longer than the default assumption, or there is quite a bit of loss. In either of these two cases, we probably want to slow down?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my main concern is that if we want to be more aggressive about PTO, such as sending another Initial we create a situation where slowing down is not the result of a misunderstanding of the RTT, but a deliberate choice to send more to start with.

if self
.spaces
.get(pn_space)
.is_some_and(|space| space.largest_acked.is_none())
{
path.borrow_mut()
.on_congestion_event(lost, &mut self.stats.borrow_mut(), now);
}
self.fire_pto(pn_space, allow_probes, now);
}
}
Expand Down Expand Up @@ -920,7 +931,7 @@ impl LossRecovery {
}
self.stats.borrow_mut().lost += lost_packets.len();

self.maybe_fire_pto(primary_path.borrow().rtt(), now, &mut lost_packets);
self.maybe_fire_pto(primary_path, now, &mut lost_packets);
lost_packets
}

Expand Down
7 changes: 7 additions & 0 deletions neqo-transport/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl PacketSender {
self.maybe_update_pacer_mtu();
}

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
pub fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool {
self.cc.on_congestion_event(last_packet, now)
}

/// Called when packets are lost. Returns true if the congestion window was reduced.
pub fn on_packets_lost(
&mut self,
Expand Down
Loading