Skip to content

Commit

Permalink
Maintain in-flight counters separately per path
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Mar 10, 2024
1 parent 9b8967a commit 70d489f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 62 deletions.
92 changes: 33 additions & 59 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ pub struct Connection {
//
// Congestion Control
//
/// Summary statistics of packets that have been sent, but not yet acked or deemed lost
in_flight: InFlight,
/// Whether the most recently received packet had an ECN codepoint set
receiving_ecn: bool,
/// Number of packets authenticated
Expand Down Expand Up @@ -347,7 +345,6 @@ impl Connection {
pto_count: 0,

app_limited: false,
in_flight: InFlight::new(),
receiving_ecn: false,
total_authed_packets: 0,

Expand Down Expand Up @@ -631,7 +628,7 @@ impl Connection {
debug_assert!(untracked_bytes <= self.path.current_mtu() as u64);

let bytes_to_send = u64::from(self.path.current_mtu()) + untracked_bytes;
if self.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
space_idx += 1;
congestion_blocked = true;
// We continue instead of breaking here in order to avoid
Expand Down Expand Up @@ -1343,13 +1340,13 @@ impl Connection {
// Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
self.ack_frequency.on_acked(packet);

self.on_packet_acked(now, info);
self.on_packet_acked(now, packet, info);
}
}

self.path.congestion.on_end_acks(
now,
self.in_flight.bytes,
self.path.in_flight.bytes,
self.app_limited,
self.spaces[space].largest_acked_packet,
);
Expand Down Expand Up @@ -1429,8 +1426,8 @@ impl Connection {

// Not timing-aware, so it's safe to call this for inferred acks, such as arise from
// high-latency handshakes
fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
self.in_flight.remove(&info);
fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
self.remove_in_flight(pn, &info);
if info.ack_eliciting && self.path.challenge.is_none() {
// Only pass ACKs to the congestion controller if we are not validating the current
// path, so as to ignore any ACKs from older paths still coming in.
Expand Down Expand Up @@ -1487,13 +1484,13 @@ impl Connection {
}
};
trace!(
in_flight = self.in_flight.bytes,
in_flight = self.path.in_flight.bytes,
count = self.pto_count,
?space,
"PTO fired"
);

let count = match self.in_flight.ack_eliciting {
let count = match self.path.in_flight.ack_eliciting {
// A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
// deadlock preventions
0 => {
Expand Down Expand Up @@ -1582,7 +1579,7 @@ impl Connection {

// OnPacketsLost
if let Some(largest_lost) = lost_packets.last().cloned() {
let old_bytes_in_flight = self.in_flight.bytes;
let old_bytes_in_flight = self.path.in_flight.bytes;
let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
self.lost_packets += lost_packets.len() as u64;
self.stats.path.lost_packets += lost_packets.len() as u64;
Expand All @@ -1595,7 +1592,7 @@ impl Connection {

for &packet in &lost_packets {
let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
self.in_flight.remove(&info);
self.remove_in_flight(packet, &info);
for frame in info.stream_frames {
self.streams.retransmit(frame);
}
Expand All @@ -1608,7 +1605,7 @@ impl Connection {
}

// Don't apply congestion penalty for lost ack-only packets
let lost_ack_eliciting = old_bytes_in_flight != self.in_flight.bytes;
let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;

if lost_ack_eliciting {
self.stats.path.congestion_events += 1;
Expand All @@ -1624,7 +1621,7 @@ impl Connection {
// Handle a lost MTU probe
if let Some(packet) = lost_mtu_probe {
let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.in_flight.remove(&info);
self.remove_in_flight(packet, &info);
self.path.mtud.on_probe_lost();
self.stats.path.lost_plpmtud_probes += 1;
}
Expand All @@ -1640,7 +1637,7 @@ impl Connection {
let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
let mut duration = self.path.rtt.pto_base() * backoff;

if self.in_flight.ack_eliciting == 0 {
if self.path.in_flight.ack_eliciting == 0 {
debug_assert!(!self.peer_completed_address_validation());
let space = match self.highest_space {
SpaceId::Handshake => SpaceId::Handshake,
Expand Down Expand Up @@ -1702,7 +1699,7 @@ impl Connection {
return;
}

if self.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
// There is nothing to detect lost, so no timer is set. However, the client needs to arm
// the timer if the server might be blocked by the anti-amplification limit.
self.timers.stop(Timer::LossDetection);
Expand Down Expand Up @@ -2022,8 +2019,8 @@ impl Connection {
space.loss_time = None;
space.in_flight = 0;
let sent_packets = mem::take(&mut space.sent_packets);
for (_, packet) in sent_packets.into_iter() {
self.in_flight.remove(&packet);
for (pn, packet) in sent_packets.into_iter() {
self.remove_in_flight(pn, &packet);
}
self.set_loss_detection_timer(now)
}
Expand Down Expand Up @@ -2300,7 +2297,7 @@ impl Connection {

let space = &mut self.spaces[SpaceId::Initial];
if let Some(info) = space.take(0) {
self.on_packet_acked(now, info);
self.on_packet_acked(now, 0, info);
};

self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
Expand All @@ -2320,8 +2317,8 @@ impl Connection {

// Retransmit all 0-RTT data
let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, info) in zero_rtt {
self.in_flight.remove(&info);
for (pn, info) in zero_rtt {
self.remove_in_flight(pn, &info);
self.spaces[SpaceId::Data].pending |= info.retransmits;
}
self.streams.retransmit_all_for_0rtt();
Expand Down Expand Up @@ -2383,8 +2380,8 @@ impl Connection {
// Discard 0-RTT packets
let sent_packets =
mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, packet) in sent_packets {
self.in_flight.remove(&packet);
for (pn, packet) in sent_packets {
self.remove_in_flight(pn, &packet);
}
} else {
self.accepted_0rtt = true;
Expand Down Expand Up @@ -3356,7 +3353,7 @@ impl Connection {
/// acknowledged or declared lost.
#[cfg(test)]
pub(crate) fn bytes_in_flight(&self) -> u64 {
self.in_flight.bytes
self.path.in_flight.bytes
}

/// Number of bytes worth of non-ack-only packets that may be sent
Expand All @@ -3365,7 +3362,7 @@ impl Connection {
self.path
.congestion
.window()
.saturating_sub(self.in_flight.bytes)
.saturating_sub(self.path.in_flight.bytes)
}

/// Whether no timers but keepalive, idle, rtt and pushnewcid are running
Expand Down Expand Up @@ -3437,6 +3434,17 @@ impl Connection {
|| !self.datagrams.outgoing.is_empty()
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
// Visit known paths from newest to oldest to find the one `pn` was sent on
for path in [&mut self.path].into_iter().chain(self.prev_path.as_mut()) {
if path.first_packet.map_or(false, |first| first <= pn) {
path.in_flight.remove(packet);
return;
}
}
}

/// Terminate the connection instantly, without sending a close packet
fn kill(&mut self, reason: ConnectionError) {
self.close_common();
Expand Down Expand Up @@ -3576,40 +3584,6 @@ mod state {
}
}

struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
ack_eliciting: u64,
}

impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}

fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove(&mut self, packet: &SentPacket) {
self.bytes -= u64::from(packet.size);
self.ack_eliciting -= u64::from(packet.ack_eliciting);
}
}

/// Events of interest to the application
#[derive(Debug)]
pub enum Event {
Expand Down
7 changes: 5 additions & 2 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ impl PacketBuilder {
stream_frames: sent.stream_frames,
};

conn.in_flight.insert(&packet);
conn.in_flight.bytes -= conn.spaces[space_id].sent(exact_number, packet);
conn.path.in_flight.insert(&packet);
if conn.path.first_packet.is_none() {
conn.path.first_packet = Some(exact_number);
}
conn.path.in_flight.bytes -= conn.spaces[space_id].sent(exact_number, packet);
conn.stats.path.sent_packets += 1;
conn.reset_keep_alive(now);
if size != 0 {
Expand Down
48 changes: 47 additions & 1 deletion quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{cmp, net::SocketAddr, time::Duration, time::Instant};

use tracing::trace;

use super::{mtud::MtuDiscovery, pacing::Pacer};
use super::{mtud::MtuDiscovery, pacing::Pacer, spaces::SentPacket};
use crate::{config::MtuDiscoveryConfig, congestion, packet::SpaceId, TIMER_GRANULARITY};

/// Description of a particular network path
Expand Down Expand Up @@ -32,6 +32,12 @@ pub(super) struct PathData {
///
/// Used in persistent congestion determination.
pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
pub(super) in_flight: InFlight,
/// Number of the first packet sent on this path
///
/// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
/// a packet was sent on a later path.
pub(super) first_packet: Option<u64>,
}

impl PathData {
Expand Down Expand Up @@ -61,6 +67,8 @@ impl PathData {
MtuDiscovery::new(initial_mtu, min_mtu, peer_max_udp_payload_size, config)
}),
first_packet_after_rtt_sample: None,
in_flight: InFlight::new(),
first_packet: None,
}
}

Expand All @@ -80,6 +88,8 @@ impl PathData {
total_recvd: 0,
mtud: prev.mtud.clone(),
first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
in_flight: InFlight::new(),
first_packet: None,
}
}

Expand Down Expand Up @@ -233,3 +243,39 @@ struct PathResponse {
/// The address the corresponding PATH_CHALLENGE was received from
remote: SocketAddr,
}

/// Summary statistics of packets that have been sent on a particular path, but which have not yet
/// been acked or deemed lost
pub(super) struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
pub(super) bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
pub(super) ack_eliciting: u64,
}

impl InFlight {
pub(super) fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}

pub(super) fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
pub(super) fn remove(&mut self, packet: &SentPacket) {
self.bytes -= u64::from(packet.size);
self.ack_eliciting -= u64::from(packet.ack_eliciting);
}
}

0 comments on commit 70d489f

Please sign in to comment.