Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain in-flight counters separately per path #1777

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 29 additions & 56 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ pub struct Connection {
//
// Congestion Control
//
/// Summary statistics of packets that have been sent, but not yet acked or deemed lost
in_flight: InFlight,
/// Whether the most recently received packet had an ECN codepoint set
receiving_ecn: bool,
/// Number of packets authenticated
Expand Down Expand Up @@ -347,7 +345,6 @@ impl Connection {
pto_count: 0,

app_limited: false,
in_flight: InFlight::new(),
receiving_ecn: false,
total_authed_packets: 0,

Expand Down Expand Up @@ -631,7 +628,7 @@ impl Connection {
debug_assert!(untracked_bytes <= self.path.current_mtu() as u64);

let bytes_to_send = u64::from(self.path.current_mtu()) + untracked_bytes;
if self.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
space_idx += 1;
congestion_blocked = true;
// We continue instead of breaking here in order to avoid
Expand Down Expand Up @@ -1343,13 +1340,13 @@ impl Connection {
// Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
self.ack_frequency.on_acked(packet);

self.on_packet_acked(now, info);
self.on_packet_acked(now, packet, info);
}
}

self.path.congestion.on_end_acks(
now,
self.in_flight.bytes,
self.path.in_flight.bytes,
self.app_limited,
self.spaces[space].largest_acked_packet,
);
Expand Down Expand Up @@ -1429,8 +1426,8 @@ impl Connection {

// Not timing-aware, so it's safe to call this for inferred acks, such as arise from
// high-latency handshakes
fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
self.remove_in_flight(&info);
fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
self.remove_in_flight(pn, &info);
if info.ack_eliciting && self.path.challenge.is_none() {
// Only pass ACKs to the congestion controller if we are not validating the current
// path, so as to ignore any ACKs from older paths still coming in.
Expand Down Expand Up @@ -1487,13 +1484,13 @@ impl Connection {
}
};
trace!(
in_flight = self.in_flight.bytes,
in_flight = self.path.in_flight.bytes,
count = self.pto_count,
?space,
"PTO fired"
);

let count = match self.in_flight.ack_eliciting {
let count = match self.path.in_flight.ack_eliciting {
// A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
// deadlock preventions
0 => {
Expand Down Expand Up @@ -1582,7 +1579,7 @@ impl Connection {

// OnPacketsLost
if let Some(largest_lost) = lost_packets.last().cloned() {
let old_bytes_in_flight = self.in_flight.bytes;
let old_bytes_in_flight = self.path.in_flight.bytes;
let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
self.lost_packets += lost_packets.len() as u64;
self.stats.path.lost_packets += lost_packets.len() as u64;
Expand All @@ -1595,7 +1592,7 @@ impl Connection {

for &packet in &lost_packets {
let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
self.remove_in_flight(&info);
self.remove_in_flight(packet, &info);
for frame in info.stream_frames {
self.streams.retransmit(frame);
}
Expand All @@ -1608,7 +1605,7 @@ impl Connection {
}

// Don't apply congestion penalty for lost ack-only packets
let lost_ack_eliciting = old_bytes_in_flight != self.in_flight.bytes;
let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;

if lost_ack_eliciting {
self.stats.path.congestion_events += 1;
Expand All @@ -1624,7 +1621,7 @@ impl Connection {
// Handle a lost MTU probe
if let Some(packet) = lost_mtu_probe {
let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.remove_in_flight(&info);
self.remove_in_flight(packet, &info);
self.path.mtud.on_probe_lost();
self.stats.path.lost_plpmtud_probes += 1;
}
Expand All @@ -1640,7 +1637,7 @@ impl Connection {
let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
let mut duration = self.path.rtt.pto_base() * backoff;

if self.in_flight.ack_eliciting == 0 {
if self.path.in_flight.ack_eliciting == 0 {
debug_assert!(!self.peer_completed_address_validation());
let space = match self.highest_space {
SpaceId::Handshake => SpaceId::Handshake,
Expand Down Expand Up @@ -1702,7 +1699,7 @@ impl Connection {
return;
}

if self.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
// There is nothing to detect lost, so no timer is set. However, the client needs to arm
// the timer if the server might be blocked by the anti-amplification limit.
self.timers.stop(Timer::LossDetection);
Expand Down Expand Up @@ -2022,8 +2019,8 @@ impl Connection {
space.loss_time = None;
space.in_flight = 0;
let sent_packets = mem::take(&mut space.sent_packets);
for (_, packet) in sent_packets.into_iter() {
self.remove_in_flight(&packet);
for (pn, packet) in sent_packets.into_iter() {
self.remove_in_flight(pn, &packet);
}
self.set_loss_detection_timer(now)
}
Expand Down Expand Up @@ -2300,7 +2297,7 @@ impl Connection {

let space = &mut self.spaces[SpaceId::Initial];
if let Some(info) = space.take(0) {
self.on_packet_acked(now, info);
self.on_packet_acked(now, 0, info);
};

self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
Expand All @@ -2320,8 +2317,8 @@ impl Connection {

// Retransmit all 0-RTT data
let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, info) in zero_rtt {
self.remove_in_flight(&info);
for (pn, info) in zero_rtt {
self.remove_in_flight(pn, &info);
self.spaces[SpaceId::Data].pending |= info.retransmits;
}
self.streams.retransmit_all_for_0rtt();
Expand Down Expand Up @@ -2383,8 +2380,8 @@ impl Connection {
// Discard 0-RTT packets
let sent_packets =
mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, packet) in sent_packets {
self.remove_in_flight(&packet);
for (pn, packet) in sent_packets {
self.remove_in_flight(pn, &packet);
}
} else {
self.accepted_0rtt = true;
Expand Down Expand Up @@ -3356,7 +3353,7 @@ impl Connection {
/// acknowledged or declared lost.
#[cfg(test)]
pub(crate) fn bytes_in_flight(&self) -> u64 {
self.in_flight.bytes
self.path.in_flight.bytes
}

/// Number of bytes worth of non-ack-only packets that may be sent
Expand All @@ -3365,7 +3362,7 @@ impl Connection {
self.path
.congestion
.window()
.saturating_sub(self.in_flight.bytes)
.saturating_sub(self.path.in_flight.bytes)
}

/// Whether no timers but keepalive, idle, rtt and pushnewcid are running
Expand Down Expand Up @@ -3438,9 +3435,13 @@ impl Connection {
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove_in_flight(&mut self, packet: &SentPacket) {
self.in_flight.bytes -= u64::from(packet.size);
self.in_flight.ack_eliciting -= u64::from(packet.ack_eliciting);
fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
// Visit known paths from newest to oldest to find the one `pn` was sent on
for path in [&mut self.path].into_iter().chain(self.prev_path.as_mut()) {
if path.remove_in_flight(pn, packet) {
return;
}
}
}

/// Terminate the connection instantly, without sending a close packet
Expand Down Expand Up @@ -3582,34 +3583,6 @@ mod state {
}
}

struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
ack_eliciting: u64,
}

impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}

fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}
}

/// Events of interest to the application
#[derive(Debug)]
pub enum Event {
Expand Down
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ impl PacketBuilder {
stream_frames: sent.stream_frames,
};

conn.in_flight.insert(&packet);
conn.in_flight.bytes -= conn.spaces[space_id].sent(exact_number, packet);
conn.path
.sent(exact_number, packet, &mut conn.spaces[space_id]);
conn.stats.path.sent_packets += 1;
conn.reset_keep_alive(now);
if size != 0 {
Expand Down
71 changes: 70 additions & 1 deletion quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use std::{cmp, net::SocketAddr, time::Duration, time::Instant};

use tracing::trace;

use super::{mtud::MtuDiscovery, pacing::Pacer};
use super::{
mtud::MtuDiscovery,
pacing::Pacer,
spaces::{PacketSpace, SentPacket},
};
use crate::{config::MtuDiscoveryConfig, congestion, packet::SpaceId, TIMER_GRANULARITY};

/// Description of a particular network path
Expand Down Expand Up @@ -32,6 +36,12 @@ pub(super) struct PathData {
///
/// Used in persistent congestion determination.
pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
pub(super) in_flight: InFlight,
/// Number of the first packet sent on this path
///
/// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
/// a packet was sent on a later path.
first_packet: Option<u64>,
}

impl PathData {
Expand Down Expand Up @@ -61,6 +71,8 @@ impl PathData {
MtuDiscovery::new(initial_mtu, min_mtu, peer_max_udp_payload_size, config)
}),
first_packet_after_rtt_sample: None,
in_flight: InFlight::new(),
first_packet: None,
}
}

Expand All @@ -80,6 +92,8 @@ impl PathData {
total_recvd: 0,
mtud: prev.mtud.clone(),
first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
in_flight: InFlight::new(),
first_packet: None,
}
}

Expand All @@ -93,6 +107,25 @@ impl PathData {
pub(super) fn current_mtu(&self) -> u16 {
self.mtud.current_mtu()
}

/// Account for transmission of `packet` with number `pn` in `space`
pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
self.in_flight.insert(&packet);
if self.first_packet.is_none() {
self.first_packet = Some(pn);
}
self.in_flight.bytes -= space.sent(pn, packet);
}

/// Remove `packet` with number `pn` from this path's congestion control counters, or return
/// `false` if `pn` was sent before this path was established.
pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
if self.first_packet.map_or(true, |first| first > pn) {
return false;
}
self.in_flight.remove(packet);
true
}
}

/// RTT estimation for a particular network path
Expand Down Expand Up @@ -233,3 +266,39 @@ struct PathResponse {
/// The address the corresponding PATH_CHALLENGE was received from
remote: SocketAddr,
}

/// Summary statistics of packets that have been sent on a particular path, but which have not yet
/// been acked or deemed lost
pub(super) struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
pub(super) bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
pub(super) ack_eliciting: u64,
}

impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}

fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove(&mut self, packet: &SentPacket) {
self.bytes -= u64::from(packet.size);
self.ack_eliciting -= u64::from(packet.ack_eliciting);
}
}