Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve behavior when only one peer sends ACK-eliciting packets #1761

Merged
merged 5 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ impl Connection {

let mut ack_eliciting_acked = false;
for packet in newly_acked.elts() {
if let Some(info) = self.spaces[space].sent_packets.remove(&packet) {
if let Some(info) = self.spaces[space].take(packet) {
if let Some(acked) = info.largest_acked {
// Assume ACKs for all packets below the largest acknowledged in `packet` have
// been received. This can cause the peer to spuriously retransmit if some of
Expand All @@ -1343,7 +1343,7 @@ impl Connection {
// Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
self.ack_frequency.on_acked(packet);

self.on_packet_acked(now, space, info);
self.on_packet_acked(now, info);
}
}

Expand Down Expand Up @@ -1429,8 +1429,8 @@ impl Connection {

// Not timing-aware, so it's safe to call this for inferred acks, such as arise from
// high-latency handshakes
fn on_packet_acked(&mut self, now: Instant, space: SpaceId, info: SentPacket) {
self.remove_in_flight(space, &info);
fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
self.remove_in_flight(&info);
if info.ack_eliciting && self.path.challenge.is_none() {
// Only pass ACKs to the congestion controller if we are not validating the current
// path, so as to ignore any ACKs from older paths still coming in.
Expand Down Expand Up @@ -1593,14 +1593,14 @@ impl Connection {
size_of_lost_packets
);

for packet in &lost_packets {
let info = self.spaces[pn_space].sent_packets.remove(packet).unwrap(); // safe: lost_packets is populated just above
self.remove_in_flight(pn_space, &info);
for &packet in &lost_packets {
let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
self.remove_in_flight(&info);
for frame in info.stream_frames {
self.streams.retransmit(frame);
}
self.spaces[pn_space].pending |= info.retransmits;
self.path.mtud.on_non_probe_lost(*packet, info.size);
self.path.mtud.on_non_probe_lost(packet, info.size);
}

if self.path.mtud.black_hole_detected(now) {
Expand All @@ -1623,11 +1623,8 @@ impl Connection {

// Handle a lost MTU probe
if let Some(packet) = lost_mtu_probe {
let info = self.spaces[SpaceId::Data]
.sent_packets
.remove(&packet)
.unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.remove_in_flight(SpaceId::Data, &info);
let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.remove_in_flight(&info);
self.path.mtud.on_probe_lost();
self.stats.path.lost_plpmtud_probes += 1;
}
Expand Down Expand Up @@ -2023,9 +2020,10 @@ impl Connection {
space.crypto = None;
space.time_of_last_ack_eliciting_packet = None;
space.loss_time = None;
space.in_flight = 0;
let sent_packets = mem::take(&mut space.sent_packets);
for (_, packet) in sent_packets.into_iter() {
self.remove_in_flight(space_id, &packet);
self.remove_in_flight(&packet);
}
self.set_loss_detection_timer(now)
}
Expand Down Expand Up @@ -2301,8 +2299,8 @@ impl Connection {
self.rem_handshake_cid = rem_cid;

let space = &mut self.spaces[SpaceId::Initial];
if let Some(info) = space.sent_packets.remove(&0) {
self.on_packet_acked(now, SpaceId::Initial, info);
if let Some(info) = space.take(0) {
self.on_packet_acked(now, info);
};

self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
Expand All @@ -2323,7 +2321,7 @@ impl Connection {
// Retransmit all 0-RTT data
let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, info) in zero_rtt {
self.remove_in_flight(SpaceId::Data, &info);
self.remove_in_flight(&info);
self.spaces[SpaceId::Data].pending |= info.retransmits;
}
self.streams.retransmit_all_for_0rtt();
Expand Down Expand Up @@ -2386,7 +2384,7 @@ impl Connection {
let sent_packets =
mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, packet) in sent_packets {
self.remove_in_flight(SpaceId::Data, &packet);
self.remove_in_flight(&packet);
}
} else {
self.accepted_0rtt = true;
Expand Down Expand Up @@ -2946,6 +2944,7 @@ impl Connection {
let mut sent = SentFrames::default();
let space = &mut self.spaces[space_id];
let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
space.pending_acks.maybe_ack_non_eliciting();

// HANDSHAKE_DONE
if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
Expand Down Expand Up @@ -2974,7 +2973,6 @@ impl Connection {

// ACK
if space.pending_acks.can_send() {
debug_assert!(!space.pending_acks.ranges().is_empty());
Self::populate_acks(
now,
self.receiving_ecn,
Expand Down Expand Up @@ -3440,10 +3438,9 @@ impl Connection {
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove_in_flight(&mut self, space: SpaceId, packet: &SentPacket) {
fn remove_in_flight(&mut self, packet: &SentPacket) {
self.in_flight.bytes -= u64::from(packet.size);
self.in_flight.ack_eliciting -= u64::from(packet.ack_eliciting);
self.spaces[space].in_flight -= u64::from(packet.size);
}

/// Terminate the connection instantly, without sending a close packet
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl PacketBuilder {
};

conn.in_flight.insert(&packet);
conn.spaces[space_id].sent(exact_number, packet);
conn.in_flight.bytes -= conn.spaces[space_id].sent(exact_number, packet);
conn.stats.path.sent_packets += 1;
conn.reset_keep_alive(now);
if size != 0 {
Expand Down
79 changes: 77 additions & 2 deletions quinn-proto/src/connection/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cmp,
collections::{BTreeMap, VecDeque},
mem,
ops::{Index, IndexMut},
ops::{Bound, Index, IndexMut},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -33,6 +33,10 @@ pub(super) struct PacketSpace {
/// The largest packet number the remote peer acknowledged in an ACK frame.
pub(super) largest_acked_packet: Option<u64>,
pub(super) largest_acked_packet_sent: Instant,
/// The highest-numbered ACK-eliciting packet we've sent
pub(super) largest_ack_eliciting_sent: u64,
/// Number of packets in `sent_packets` with numbers above `largest_ack_eliciting_sent`
pub(super) unacked_non_ack_eliciting_tail: u64,
/// Transmitted but not acked
// We use a BTreeMap here so we can efficiently query by range on ACK and for loss detection
pub(super) sent_packets: BTreeMap<u64, SentPacket>,
Expand Down Expand Up @@ -80,6 +84,8 @@ impl PacketSpace {
next_packet_number: 0,
largest_acked_packet: None,
largest_acked_packet_sent: now,
largest_ack_eliciting_sent: 0,
unacked_non_ack_eliciting_tail: 0,
sent_packets: BTreeMap::new(),
ecn_counters: frame::EcnCounts::ZERO,
ecn_feedback: frame::EcnCounts::ZERO,
Expand Down Expand Up @@ -199,9 +205,58 @@ impl PacketSpace {
Ok(ce_increase != 0)
}

pub(super) fn sent(&mut self, number: u64, packet: SentPacket) {
/// Stop tracking sent packet `number`, and return what we knew about it
pub(super) fn take(&mut self, number: u64) -> Option<SentPacket> {
let packet = self.sent_packets.remove(&number)?;
self.in_flight -= u64::from(packet.size);
if !packet.ack_eliciting && number > self.largest_ack_eliciting_sent {
self.unacked_non_ack_eliciting_tail =
self.unacked_non_ack_eliciting_tail.checked_sub(1).unwrap();
}
Some(packet)
}

/// Returns the number of bytes to *remove* from the connection's in-flight count
pub(super) fn sent(&mut self, number: u64, packet: SentPacket) -> u64 {
// Retain state for at most this many non-ACK-eliciting packets sent after the most recently
// sent ACK-eliciting packet. We're never guaranteed to receive an ACK for those, and we
// can't judge them as lost without an ACK, so to limit memory in applications which receive
// packets but don't send ACK-eliciting data for long periods use we must eventually start
// forgetting about them, although it might also be reasonable to just kill the connection
// due to weird peer behavior.
const MAX_UNACKED_NON_ACK_ELICTING_TAIL: u64 = 1_000;

let mut forgotten_bytes = 0;
if packet.ack_eliciting {
self.unacked_non_ack_eliciting_tail = 0;
self.largest_ack_eliciting_sent = number;
} else if self.unacked_non_ack_eliciting_tail > MAX_UNACKED_NON_ACK_ELICTING_TAIL {
let oldest_after_ack_eliciting = *self
.sent_packets
.range((
Bound::Excluded(self.largest_ack_eliciting_sent),
Bound::Unbounded,
))
.next()
.unwrap()
.0;
// Per https://www.rfc-editor.org/rfc/rfc9000.html#name-frames-and-frame-types,
// non-ACK-eliciting packets must only contain PADDING, ACK, and CONNECTION_CLOSE
// frames, which require no special handling on ACK or loss beyond removal from
// in-flight counters if padded.
let packet = self
.sent_packets
.remove(&oldest_after_ack_eliciting)
.unwrap();
forgotten_bytes = u64::from(packet.size);
self.in_flight -= forgotten_bytes;
} else {
self.unacked_non_ack_eliciting_tail += 1;
}

self.in_flight += u64::from(packet.size);
self.sent_packets.insert(number, packet);
forgotten_bytes
}
}

Expand Down Expand Up @@ -509,6 +564,7 @@ pub(super) struct PendingAcks {
///
/// Once the count _exceeds_ `ack_eliciting_threshold`, an immediate ACK is required
ack_eliciting_since_last_ack_sent: u64,
non_ack_eliciting_since_last_ack_sent: u64,
ack_eliciting_threshold: u64,
/// The reordering threshold, controlling how we respond to out-of-order ack-eliciting packets
///
Expand Down Expand Up @@ -538,6 +594,7 @@ impl PendingAcks {
Self {
immediate_ack_required: false,
ack_eliciting_since_last_ack_sent: 0,
non_ack_eliciting_since_last_ack_sent: 0,
ack_eliciting_threshold: 1,
reordering_threshold: 1,
earliest_ack_eliciting_since_last_ack_sent: None,
Expand Down Expand Up @@ -588,6 +645,7 @@ impl PendingAcks {
dedup: &Dedup,
) -> bool {
if !ack_eliciting {
self.non_ack_eliciting_since_last_ack_sent += 1;
return false;
}

Expand Down Expand Up @@ -669,6 +727,7 @@ impl PendingAcks {
// occasional redundant retransmits.
self.immediate_ack_required = false;
self.ack_eliciting_since_last_ack_sent = 0;
self.non_ack_eliciting_since_last_ack_sent = 0;
self.earliest_ack_eliciting_since_last_ack_sent = None;
self.largest_acked = self.largest_ack_eliciting_packet;
}
Expand All @@ -695,6 +754,22 @@ impl PendingAcks {
pub(super) fn ranges(&self) -> &ArrayRangeSet {
&self.ranges
}

/// Queue an ACK if a significant number of non-ACK-eliciting packets have not yet been
/// acknowledged
///
/// Should be called immediately before a non-probing packet is composed, when we've already
/// committed to sending a packet regardless.
pub(super) fn maybe_ack_non_eliciting(&mut self) {
// If we're going to send a packet anyway, and we've received a significant number of
// non-ACK-eliciting packets, then include an ACK to help the peer perform timely loss
// detection even if they're not sending any ACK-eliciting packets themselves. Exact
// threshold chosen somewhat arbitrarily.
const LAZY_ACK_THRESHOLD: u64 = 10;
djc marked this conversation as resolved.
Show resolved Hide resolved
if self.non_ack_eliciting_since_last_ack_sent > LAZY_ACK_THRESHOLD {
self.immediate_ack_required = true;
}
}
}

/// Helper for mitigating [optimistic ACK attacks]
Expand Down
24 changes: 24 additions & 0 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2787,3 +2787,27 @@ fn reject_new_connections() {
pair.server.assert_no_accept();
assert!(pair.client.connections.get(&client_ch).unwrap().is_closed());
}

/// Verify that an endpoint which receives but does not send ACK-eliciting data still receives ACKs
/// occasionally. This is not required for conformance, but makes loss detection more responsive and
/// reduces receiver memory use.
#[test]
fn pure_sender_voluntarily_acks() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();

let receiver_acks_initial = pair.server_conn_mut(server_ch).stats().frame_rx.acks;

for _ in 0..100 {
const MSG: &[u8] = b"hello";
pair.client_datagrams(client_ch)
.send(Bytes::from_static(MSG))
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), MSG);
}

let receiver_acks_final = pair.server_conn_mut(server_ch).stats().frame_rx.acks;
assert!(receiver_acks_final > receiver_acks_initial);
}