From 7028479a4bc0e652f97295c5c57bfafd42c7dd92 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 19 Mar 2024 16:17:50 +1000 Subject: [PATCH] Revert "Merge branch 'sent-packets-vec' of github.com:martinthomson/neqo" This reverts commit 8e8972c13bd3d511b5d2c7e1de8478fcc54d1478, reversing changes made to 32ef2c3cb2f0f13c6b46222d98c3316a69d7b411. --- neqo-transport/src/cc/classic_cc.rs | 60 +-- neqo-transport/src/cc/mod.rs | 2 +- neqo-transport/src/cc/tests/cubic.rs | 2 +- neqo-transport/src/cc/tests/new_reno.rs | 6 +- neqo-transport/src/connection/mod.rs | 14 +- neqo-transport/src/path.rs | 8 +- neqo-transport/src/qlog.rs | 11 +- .../src/{recovery/mod.rs => recovery.rs} | 207 ++++++--- neqo-transport/src/recovery/sent.rs | 413 ------------------ neqo-transport/src/recovery/token.rs | 63 --- neqo-transport/src/sender.rs | 4 +- neqo-transport/src/tracking.rs | 108 +++++ 12 files changed, 302 insertions(+), 596 deletions(-) rename neqo-transport/src/{recovery/mod.rs => recovery.rs} (91%) delete mode 100644 neqo-transport/src/recovery/sent.rs delete mode 100644 neqo-transport/src/recovery/token.rs diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 0e26d39cb..89be6c4b0 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -17,9 +17,9 @@ use crate::{ cc::MAX_DATAGRAM_SIZE, packet::PacketNumber, qlog::{self, QlogMetric}, - recovery::SentPacket, rtt::RttEstimate, sender::PACING_BURST_SIZE, + tracking::SentPacket, }; #[rustfmt::skip] // to keep `::` and thus prevent conflict with `crate::qlog` use ::qlog::events::{quic::CongestionStateUpdated, EventData}; @@ -167,8 +167,8 @@ impl CongestionControl for ClassicCongestionControl { qinfo!( "packet_acked this={:p}, pn={}, ps={}, ignored={}, lost={}, rtt_est={:?}", self, - pkt.pn(), - pkt.len(), + pkt.pn, + pkt.size, i32::from(!pkt.cc_outstanding()), i32::from(pkt.lost()), rtt_est, @@ -176,11 +176,11 @@ impl CongestionControl for ClassicCongestionControl { if !pkt.cc_outstanding() { continue; } - if pkt.pn() < self.first_app_limited { + if pkt.pn < self.first_app_limited { is_app_limited = false; } - assert!(self.bytes_in_flight >= pkt.len()); - self.bytes_in_flight -= pkt.len(); + assert!(self.bytes_in_flight >= pkt.size); + self.bytes_in_flight -= pkt.size; if !self.after_recovery_start(pkt) { // Do not increase congestion window for packets sent before @@ -193,7 +193,7 @@ impl CongestionControl for ClassicCongestionControl { qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]); } - new_acked += pkt.len(); + new_acked += pkt.size; } if is_app_limited { @@ -268,11 +268,11 @@ impl CongestionControl for ClassicCongestionControl { qinfo!( "packet_lost this={:p}, pn={}, ps={}", self, - pkt.pn(), - pkt.len() + pkt.pn, + pkt.size ); - assert!(self.bytes_in_flight >= pkt.len()); - self.bytes_in_flight -= pkt.len(); + assert!(self.bytes_in_flight >= pkt.size); + self.bytes_in_flight -= pkt.size; } qlog::metrics_updated( &mut self.qlog, @@ -298,13 +298,13 @@ impl CongestionControl for ClassicCongestionControl { fn discard(&mut self, pkt: &SentPacket) { if pkt.cc_outstanding() { - assert!(self.bytes_in_flight >= pkt.len()); - self.bytes_in_flight -= pkt.len(); + assert!(self.bytes_in_flight >= pkt.size); + self.bytes_in_flight -= pkt.size; qlog::metrics_updated( &mut self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], ); - qtrace!([self], "Ignore pkt with size {}", pkt.len()); + qtrace!([self], "Ignore pkt with size {}", pkt.size); } } @@ -319,7 +319,7 @@ impl CongestionControl for ClassicCongestionControl { fn on_packet_sent(&mut self, pkt: &SentPacket) { // Record the recovery time and exit any transient state. if self.state.transient() { - self.recovery_start = Some(pkt.pn()); + self.recovery_start = Some(pkt.pn); self.state.update(); } @@ -331,15 +331,15 @@ impl CongestionControl for ClassicCongestionControl { // window. Assume that all in-flight packets up to this one are NOT app-limited. // However, subsequent packets might be app-limited. Set `first_app_limited` to the // next packet number. - self.first_app_limited = pkt.pn() + 1; + self.first_app_limited = pkt.pn + 1; } - self.bytes_in_flight += pkt.len(); + self.bytes_in_flight += pkt.size; qinfo!( "packet_sent this={:p}, pn={}, ps={}", self, - pkt.pn(), - pkt.len() + pkt.pn, + pkt.size ); qlog::metrics_updated( &mut self.qlog, @@ -438,20 +438,20 @@ impl ClassicCongestionControl { let cutoff = max(first_rtt_sample_time, prev_largest_acked_sent); for p in lost_packets .iter() - .skip_while(|p| Some(p.time_sent()) < cutoff) + .skip_while(|p| Some(p.time_sent) < cutoff) { - if p.pn() != last_pn + 1 { + if p.pn != last_pn + 1 { // Not a contiguous range of lost packets, start over. start = None; } - last_pn = p.pn(); + last_pn = p.pn; if !p.cc_in_flight() { // Not interesting, keep looking. continue; } if let Some(t) = start { let elapsed = p - .time_sent() + .time_sent .checked_duration_since(t) .expect("time is monotonic"); if elapsed > pc_period { @@ -466,7 +466,7 @@ impl ClassicCongestionControl { return true; } } else { - start = Some(p.time_sent()); + start = Some(p.time_sent); } } false @@ -480,7 +480,7 @@ impl ClassicCongestionControl { // state and update the variable `self.recovery_start`. Before the // first recovery, all packets were sent after the recovery event, // allowing to reduce the cwnd on congestion events. - !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn) + !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn >= pn) } /// Handle a congestion event. @@ -551,8 +551,8 @@ mod tests { CongestionControl, CongestionControlAlgorithm, CWND_INITIAL_PKTS, MAX_DATAGRAM_SIZE, }, packet::{PacketNumber, PacketType}, - recovery::SentPacket, rtt::RttEstimate, + tracking::SentPacket, }; const PTO: Duration = Duration::from_millis(100); @@ -912,12 +912,12 @@ mod tests { fn persistent_congestion_ack_eliciting() { let mut lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); lost[0] = SentPacket::new( - lost[0].packet_type(), - lost[0].pn(), - lost[0].time_sent(), + lost[0].pt, + lost[0].pn, + lost[0].time_sent, false, Vec::new(), - lost[0].len(), + lost[0].size, ); assert!(!persistent_congestion_by_pto( ClassicCongestionControl::new(NewReno::default()), diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index 965b3e555..486d15e67 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -14,7 +14,7 @@ use std::{ use neqo_common::qlog::NeqoQlog; -use crate::{path::PATH_MTU_V6, recovery::SentPacket, rtt::RttEstimate, Error}; +use crate::{path::PATH_MTU_V6, rtt::RttEstimate, tracking::SentPacket, Error}; mod classic_cc; mod cubic; diff --git a/neqo-transport/src/cc/tests/cubic.rs b/neqo-transport/src/cc/tests/cubic.rs index 1d933790e..2e0200fd6 100644 --- a/neqo-transport/src/cc/tests/cubic.rs +++ b/neqo-transport/src/cc/tests/cubic.rs @@ -24,8 +24,8 @@ use crate::{ CongestionControl, MAX_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE_F64, }, packet::PacketType, - recovery::SentPacket, rtt::RttEstimate, + tracking::SentPacket, }; const RTT: Duration = Duration::from_millis(100); diff --git a/neqo-transport/src/cc/tests/new_reno.rs b/neqo-transport/src/cc/tests/new_reno.rs index 863c15c26..4cc20de5a 100644 --- a/neqo-transport/src/cc/tests/new_reno.rs +++ b/neqo-transport/src/cc/tests/new_reno.rs @@ -16,8 +16,8 @@ use crate::{ MAX_DATAGRAM_SIZE, }, packet::PacketType, - recovery::SentPacket, rtt::RttEstimate, + tracking::SentPacket, }; const PTO: Duration = Duration::from_millis(100); @@ -125,14 +125,14 @@ fn issue_876() { // and ack it. cwnd increases slightly cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, time_now); - assert_eq!(cc.acked_bytes(), sent_packets[6].len()); + assert_eq!(cc.acked_bytes(), sent_packets[6].size); cwnd_is_halved(&cc); assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); // Packet from before is lost. Should not hurt cwnd. cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]); assert!(!cc.recovery_packet()); - assert_eq!(cc.acked_bytes(), sent_packets[6].len()); + assert_eq!(cc.acked_bytes(), sent_packets[6].size); cwnd_is_halved(&cc); assert_eq!(cc.bytes_in_flight(), 4 * MAX_DATAGRAM_SIZE); } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 03f05aad0..c81a3727c 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -44,7 +44,7 @@ use crate::{ path::{Path, PathRef, Paths}, qlog, quic_datagrams::{DatagramTracking, QuicDatagrams}, - recovery::{LossRecovery, RecoveryToken, SendProfile, SentPacket}, + recovery::{LossRecovery, RecoveryToken, SendProfile}, recv_stream::RecvStreamStats, rtt::GRANULARITY, send_stream::SendStream, @@ -55,7 +55,7 @@ use crate::{ self, TransportParameter, TransportParameterId, TransportParameters, TransportParametersHandler, }, - tracking::{AckTracker, PacketNumberSpace}, + tracking::{AckTracker, PacketNumberSpace, SentPacket}, version::{Version, WireVersion}, AppError, ConnectionError, Error, Res, StreamId, }; @@ -2336,7 +2336,7 @@ impl Connection { packets.len(), mtu ); - initial.add_padding(mtu - packets.len()); + initial.size += mtu - packets.len(); packets.resize(mtu, 0); } self.loss_recovery.on_packet_sent(path, initial); @@ -2855,7 +2855,7 @@ impl Connection { /// to retransmit the frame as needed. fn handle_lost_packets(&mut self, lost_packets: &[SentPacket]) { for lost in lost_packets { - for token in lost.tokens() { + for token in &lost.tokens { qdebug!([self], "Lost: {:?}", token); match token { RecoveryToken::Ack(_) => {} @@ -2891,12 +2891,12 @@ impl Connection { fn handle_ack( &mut self, space: PacketNumberSpace, - largest_acknowledged: PacketNumber, + largest_acknowledged: u64, ack_ranges: R, ack_delay: u64, now: Instant, ) where - R: IntoIterator> + Debug, + R: IntoIterator> + Debug, R::IntoIter: ExactSizeIterator, { qinfo!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges); @@ -2910,7 +2910,7 @@ impl Connection { now, ); for acked in acked_packets { - for token in acked.tokens() { + for token in &acked.tokens { match token { RecoveryToken::Stream(stream_token) => self.streams.acked(stream_token), RecoveryToken::Ack(at) => self.acks.acked(at), diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 59bb871b3..4e8d9958a 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -24,11 +24,11 @@ use crate::{ cid::{ConnectionId, ConnectionIdRef, ConnectionIdStore, RemoteConnectionIdEntry}, frame::{FRAME_TYPE_PATH_CHALLENGE, FRAME_TYPE_PATH_RESPONSE, FRAME_TYPE_RETIRE_CONNECTION_ID}, packet::PacketBuilder, - recovery::{RecoveryToken, SentPacket}, + recovery::RecoveryToken, rtt::RttEstimate, sender::PacketSender, stats::FrameStats, - tracking::PacketNumberSpace, + tracking::{PacketNumberSpace, SentPacket}, Stats, }; @@ -943,12 +943,12 @@ impl Path { qinfo!( [self], "discarding a packet without an RTT estimate; guessing RTT={:?}", - now - sent.time_sent() + now - sent.time_sent ); stats.rtt_init_guess = true; self.rtt.update( &mut self.qlog, - now - sent.time_sent(), + now - sent.time_sent, Duration::new(0, 0), false, now, diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index 773297413..257296610 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -27,9 +27,9 @@ use crate::{ frame::{CloseError, Frame}, packet::{DecryptedPacket, PacketNumber, PacketType, PublicPacket}, path::PathRef, - recovery::SentPacket, stream_id::StreamType as NeqoStreamType, tparams::{self, TransportParametersHandler}, + tracking::SentPacket, version::{Version, VersionConfig, WireVersion}, }; @@ -259,13 +259,8 @@ pub fn packet_dropped(qlog: &mut NeqoQlog, public_packet: &PublicPacket) { pub fn packets_lost(qlog: &mut NeqoQlog, pkts: &[SentPacket]) { qlog.add_event_with_stream(|stream| { for pkt in pkts { - let header = PacketHeader::with_type( - to_qlog_pkt_type(pkt.packet_type()), - Some(pkt.pn()), - None, - None, - None, - ); + let header = + PacketHeader::with_type(to_qlog_pkt_type(pkt.pt), Some(pkt.pn), None, None, None); let ev_data = EventData::PacketLost(PacketLost { header: Some(header), diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery.rs similarity index 91% rename from neqo-transport/src/recovery/mod.rs rename to neqo-transport/src/recovery.rs index 2aec479b5..dbea3aaf5 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery.rs @@ -6,29 +6,30 @@ // Tracking of sent packets and detecting their loss. -mod sent; -mod token; - use std::{ cmp::{max, min}, - convert::TryFrom, + collections::BTreeMap, + mem, ops::RangeInclusive, time::{Duration, Instant}, }; use neqo_common::{qdebug, qinfo, qlog::NeqoQlog, qtrace, qwarn}; -pub use sent::SentPacket; -use sent::SentPackets; use smallvec::{smallvec, SmallVec}; -pub use token::{RecoveryToken, StreamRecoveryToken}; use crate::{ + ackrate::AckRate, + cid::ConnectionIdEntry, + crypto::CryptoRecoveryToken, packet::PacketNumber, path::{Path, PathRef}, qlog::{self, QlogMetric}, + quic_datagrams::DatagramTracking, rtt::RttEstimate, + send_stream::SendStreamRecoveryToken, stats::{Stats, StatsCell}, - tracking::{PacketNumberSpace, PacketNumberSpaceSet}, + stream_id::{StreamId, StreamType}, + tracking::{AckToken, PacketNumberSpace, PacketNumberSpaceSet, SentPacket}, }; pub(crate) const PACKET_THRESHOLD: u64 = 3; @@ -47,6 +48,54 @@ pub(crate) const MIN_OUTSTANDING_UNACK: usize = 16; /// The scale we use for the fast PTO feature. pub const FAST_PTO_SCALE: u8 = 100; +#[derive(Debug, Clone)] +#[allow(clippy::module_name_repetitions)] +pub enum StreamRecoveryToken { + Stream(SendStreamRecoveryToken), + ResetStream { + stream_id: StreamId, + }, + StopSending { + stream_id: StreamId, + }, + + MaxData(u64), + DataBlocked(u64), + + MaxStreamData { + stream_id: StreamId, + max_data: u64, + }, + StreamDataBlocked { + stream_id: StreamId, + limit: u64, + }, + + MaxStreams { + stream_type: StreamType, + max_streams: u64, + }, + StreamsBlocked { + stream_type: StreamType, + limit: u64, + }, +} + +#[derive(Debug, Clone)] +#[allow(clippy::module_name_repetitions)] +pub enum RecoveryToken { + Stream(StreamRecoveryToken), + Ack(AckToken), + Crypto(CryptoRecoveryToken), + HandshakeDone, + KeepAlive, // Special PING. + NewToken(usize), + NewConnectionId(ConnectionIdEntry<[u8; 16]>), + RetireConnectionId(u64), + AckFrequency(AckRate), + Datagram(DatagramTracking), +} + /// `SendProfile` tells a sender how to send packets. #[derive(Debug)] pub struct SendProfile { @@ -131,8 +180,7 @@ pub(crate) struct LossRecoverySpace { /// This might be less than the number of ACK-eliciting packets, /// because PTO packets don't count. in_flight_outstanding: usize, - /// The packets that we have sent and are tracking. - sent_packets: SentPackets, + sent_packets: BTreeMap, /// The time that the first out-of-order packet was sent. /// This is `None` if there were no out-of-order packets detected. /// When set to `Some(T)`, time-based loss detection should be enabled. @@ -147,7 +195,7 @@ impl LossRecoverySpace { largest_acked_sent_time: None, last_ack_eliciting: None, in_flight_outstanding: 0, - sent_packets: SentPackets::default(), + sent_packets: BTreeMap::default(), first_ooo_time: None, } } @@ -172,9 +220,9 @@ impl LossRecoverySpace { pub fn pto_packets(&mut self, count: usize) -> impl Iterator { self.sent_packets .iter_mut() - .filter_map(|sent| { + .filter_map(|(pn, sent)| { if sent.pto() { - qtrace!("PTO: marking packet {} lost ", sent.pn()); + qtrace!("PTO: marking packet {} lost ", pn); Some(&*sent) } else { None @@ -207,16 +255,16 @@ impl LossRecoverySpace { pub fn on_packet_sent(&mut self, sent_packet: SentPacket) { if sent_packet.ack_eliciting() { - self.last_ack_eliciting = Some(sent_packet.time_sent()); + self.last_ack_eliciting = Some(sent_packet.time_sent); self.in_flight_outstanding += 1; } else if self.space != PacketNumberSpace::ApplicationData && self.last_ack_eliciting.is_none() { // For Initial and Handshake spaces, make sure that we have a PTO baseline // always. See `LossRecoverySpace::pto_base_time()` for details. - self.last_ack_eliciting = Some(sent_packet.time_sent()); + self.last_ack_eliciting = Some(sent_packet.time_sent); } - self.sent_packets.track(sent_packet); + self.sent_packets.insert(sent_packet.pn, sent_packet); } /// If we are only sending ACK frames, send a PING frame after 2 PTOs so that @@ -246,31 +294,46 @@ impl LossRecoverySpace { } } - /// Remove all newly acknowledged packets. + /// Remove all acknowledged packets. /// Returns all the acknowledged packets, with the largest packet number first. /// ...and a boolean indicating if any of those packets were ack-eliciting. /// This operates more efficiently because it assumes that the input is sorted /// in the order that an ACK frame is (from the top). fn remove_acked(&mut self, acked_ranges: R, stats: &mut Stats) -> (Vec, bool) where - R: IntoIterator>, + R: IntoIterator>, R::IntoIter: ExactSizeIterator, { - let mut eliciting = false; + let acked_ranges = acked_ranges.into_iter(); + let mut keep = Vec::with_capacity(acked_ranges.len()); + let mut acked = Vec::new(); + let mut eliciting = false; for range in acked_ranges { - self.sent_packets.take_range(range, &mut acked); - } - for p in &acked { - self.remove_packet(p); - eliciting |= p.ack_eliciting(); - if p.lost() { - stats.late_ack += 1; - } - if p.pto_fired() { - stats.pto_ack += 1; + let first_keep = *range.end() + 1; + if let Some((&first, _)) = self.sent_packets.range(range).next() { + let mut tail = self.sent_packets.split_off(&first); + if let Some((&next, _)) = tail.range(first_keep..).next() { + keep.push(tail.split_off(&next)); + } + for (_, p) in tail.into_iter().rev() { + self.remove_packet(&p); + eliciting |= p.ack_eliciting(); + if p.lost() { + stats.late_ack += 1; + } + if p.pto_fired() { + stats.pto_ack += 1; + } + acked.push(p); + } } } + + for mut k in keep.into_iter().rev() { + self.sent_packets.append(&mut k); + } + (acked, eliciting) } @@ -279,12 +342,12 @@ impl LossRecoverySpace { /// and when keys are dropped. fn remove_ignored(&mut self) -> impl Iterator { self.in_flight_outstanding = 0; - std::mem::take(&mut self.sent_packets).drain_all() + mem::take(&mut self.sent_packets).into_values() } /// Remove the primary path marking on any packets this is tracking. fn migrate(&mut self) { - for pkt in self.sent_packets.iter_mut() { + for pkt in self.sent_packets.values_mut() { pkt.clear_primary_path(); } } @@ -295,8 +358,23 @@ impl LossRecoverySpace { /// might remove all in-flight packets and stop sending probes. #[allow(clippy::option_if_let_else)] // Hard enough to read as-is. fn remove_old_lost(&mut self, now: Instant, cd: Duration) { - for p in self.sent_packets.remove_expired(now, cd) { - self.remove_packet(&p); + let mut it = self.sent_packets.iter(); + // If the first item is not expired, do nothing. + if it.next().map_or(false, |(_, p)| p.expired(now, cd)) { + // Find the index of the first unexpired packet. + let to_remove = if let Some(first_keep) = + it.find_map(|(i, p)| if p.expired(now, cd) { None } else { Some(*i) }) + { + // Some packets haven't expired, so keep those. + let keep = self.sent_packets.split_off(&first_keep); + mem::replace(&mut self.sent_packets, keep) + } else { + // All packets are expired. + mem::take(&mut self.sent_packets) + }; + for (_, p) in to_remove { + self.remove_packet(&p); + } } } @@ -323,39 +401,44 @@ impl LossRecoverySpace { let largest_acked = self.largest_acked; - for packet in self + // Lost for retrans/CC purposes + let mut lost_pns = SmallVec::<[_; 8]>::new(); + + for (pn, packet) in self .sent_packets .iter_mut() // BTreeMap iterates in order of ascending PN - .take_while(|p| p.pn() < largest_acked.unwrap_or(PacketNumber::MAX)) + .take_while(|(&k, _)| k < largest_acked.unwrap_or(PacketNumber::MAX)) { // Packets sent before now - loss_delay are deemed lost. - if packet.time_sent() + loss_delay <= now { + if packet.time_sent + loss_delay <= now { qtrace!( "lost={}, time sent {:?} is before lost_delay {:?}", - packet.pn(), - packet.time_sent(), + pn, + packet.time_sent, loss_delay ); - } else if largest_acked >= Some(packet.pn() + PACKET_THRESHOLD) { + } else if largest_acked >= Some(*pn + PACKET_THRESHOLD) { qtrace!( "lost={}, is >= {} from largest acked {:?}", - packet.pn(), + pn, PACKET_THRESHOLD, largest_acked ); } else { if largest_acked.is_some() { - self.first_ooo_time = Some(packet.time_sent()); + self.first_ooo_time = Some(packet.time_sent); } // No more packets can be declared lost after this one. break; }; if packet.declare_lost(now) { - lost_packets.push(packet.clone()); + lost_pns.push(*pn); } } + + lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone())); } } @@ -545,8 +628,8 @@ impl LossRecovery { } pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket) { - let pn_space = PacketNumberSpace::from(sent_packet.packet_type()); - qdebug!([self], "packet {}-{} sent", pn_space, sent_packet.pn()); + let pn_space = PacketNumberSpace::from(sent_packet.pt); + qdebug!([self], "packet {}-{} sent", pn_space, sent_packet.pn); if let Some(space) = self.spaces.get_mut(pn_space) { path.borrow_mut().packet_sent(&mut sent_packet); space.on_packet_sent(sent_packet); @@ -555,7 +638,7 @@ impl LossRecovery { [self], "ignoring {}-{} from dropped space", pn_space, - sent_packet.pn() + sent_packet.pn ); } } @@ -586,13 +669,13 @@ impl LossRecovery { &mut self, primary_path: &PathRef, pn_space: PacketNumberSpace, - largest_acked: PacketNumber, + largest_acked: u64, acked_ranges: R, ack_delay: Duration, now: Instant, ) -> (Vec, Vec) where - R: IntoIterator>, + R: IntoIterator>, R::IntoIter: ExactSizeIterator, { qdebug!( @@ -622,11 +705,11 @@ impl LossRecovery { // If the largest acknowledged is newly acked and any newly acked // packet was ack-eliciting, update the RTT. (-recovery 5.1) let largest_acked_pkt = acked_packets.first().expect("must be there"); - space.largest_acked_sent_time = Some(largest_acked_pkt.time_sent()); + space.largest_acked_sent_time = Some(largest_acked_pkt.time_sent); if any_ack_eliciting && largest_acked_pkt.on_primary_path() { self.rtt_sample( primary_path.borrow_mut().rtt_mut(), - largest_acked_pkt.time_sent(), + largest_acked_pkt.time_sent, now, ack_delay, ); @@ -934,7 +1017,6 @@ impl ::std::fmt::Display for LossRecovery { mod tests { use std::{ cell::RefCell, - convert::TryInto, ops::{Deref, DerefMut, RangeInclusive}, rc::Rc, time::{Duration, Instant}, @@ -949,7 +1031,7 @@ mod tests { use crate::{ cc::CongestionControlAlgorithm, cid::{ConnectionId, ConnectionIdEntry}, - packet::{PacketNumber, PacketType}, + packet::PacketType, path::{Path, PathRef}, rtt::RttEstimate, stats::{Stats, StatsCell}, @@ -976,8 +1058,8 @@ mod tests { pub fn on_ack_received( &mut self, pn_space: PacketNumberSpace, - largest_acked: PacketNumber, - acked_ranges: Vec>, + largest_acked: u64, + acked_ranges: Vec>, ack_delay: Duration, now: Instant, ) -> (Vec, Vec) { @@ -1146,8 +1228,8 @@ mod tests { ); } - fn add_sent(lrs: &mut LossRecoverySpace, max_pn: PacketNumber) { - for pn in 0..=max_pn { + fn add_sent(lrs: &mut LossRecoverySpace, packet_numbers: &[u64]) { + for &pn in packet_numbers { lrs.on_packet_sent(SentPacket::new( PacketType::Short, pn, @@ -1159,18 +1241,15 @@ mod tests { } } - fn match_acked(acked: &[SentPacket], expected: &[PacketNumber]) { - assert_eq!( - acked.iter().map(SentPacket::pn).collect::>(), - expected - ); + fn match_acked(acked: &[SentPacket], expected: &[u64]) { + assert!(acked.iter().map(|p| &p.pn).eq(expected)); } #[test] fn remove_acked() { let mut lrs = LossRecoverySpace::new(PacketNumberSpace::ApplicationData); let mut stats = Stats::default(); - add_sent(&mut lrs, 10); + add_sent(&mut lrs, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let (acked, _) = lrs.remove_acked(vec![], &mut stats); assert!(acked.is_empty()); let (acked, _) = lrs.remove_acked(vec![7..=8, 2..=4], &mut stats); @@ -1178,7 +1257,7 @@ mod tests { let (acked, _) = lrs.remove_acked(vec![8..=11], &mut stats); match_acked(&acked, &[10, 9]); let (acked, _) = lrs.remove_acked(vec![0..=2], &mut stats); - match_acked(&acked, &[1, 0]); + match_acked(&acked, &[1]); let (acked, _) = lrs.remove_acked(vec![5..=6], &mut stats); match_acked(&acked, &[6, 5]); } @@ -1413,7 +1492,7 @@ mod tests { PacketType::Short, ] { let sent_pkt = SentPacket::new(*sp, 1, pn_time(3), true, Vec::new(), ON_SENT_SIZE); - let pn_space = PacketNumberSpace::from(sent_pkt.packet_type()); + let pn_space = PacketNumberSpace::from(sent_pkt.pt); lr.on_packet_sent(sent_pkt); lr.on_ack_received(pn_space, 1, vec![1..=1], Duration::from_secs(0), pn_time(3)); let mut lost = Vec::new(); @@ -1514,7 +1593,7 @@ mod tests { lr.on_packet_sent(SentPacket::new( PacketType::Initial, - 0, + 1, now(), true, Vec::new(), diff --git a/neqo-transport/src/recovery/sent.rs b/neqo-transport/src/recovery/sent.rs deleted file mode 100644 index 33d77b0ff..000000000 --- a/neqo-transport/src/recovery/sent.rs +++ /dev/null @@ -1,413 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -// A collection for sent packets. - -use std::{ - cmp::min, - collections::VecDeque, - convert::TryFrom, - ops::RangeInclusive, - time::{Duration, Instant}, -}; - -use crate::{ - packet::{PacketNumber, PacketType}, - recovery::RecoveryToken, -}; - -#[derive(Debug, Clone)] -pub struct SentPacket { - pt: PacketType, - pn: PacketNumber, - ack_eliciting: bool, - time_sent: Instant, - primary_path: bool, - tokens: Vec, - - time_declared_lost: Option, - /// After a PTO, this is true when the packet has been released. - pto: bool, - - len: usize, -} - -impl SentPacket { - pub fn new( - pt: PacketType, - pn: PacketNumber, - time_sent: Instant, - ack_eliciting: bool, - tokens: Vec, - len: usize, - ) -> Self { - Self { - pt, - pn, - time_sent, - ack_eliciting, - primary_path: true, - tokens, - time_declared_lost: None, - pto: false, - len, - } - } - - /// The type of this packet. - pub fn packet_type(&self) -> PacketType { - self.pt - } - - /// The number of the packet. - pub fn pn(&self) -> PacketNumber { - self.pn - } - - /// The time that this packet was sent. - pub fn time_sent(&self) -> Instant { - self.time_sent - } - - /// Returns `true` if the packet will elicit an ACK. - pub fn ack_eliciting(&self) -> bool { - self.ack_eliciting - } - - /// Returns `true` if the packet was sent on the primary path. - pub fn on_primary_path(&self) -> bool { - self.primary_path - } - - /// The length of the packet that was sent. - pub fn len(&self) -> usize { - self.len - } - - /// Access the recovery tokens that this holds. - pub fn tokens(&self) -> &[RecoveryToken] { - &self.tokens - } - - /// Clears the flag that had this packet on the primary path. - /// Used when migrating to clear out state. - pub fn clear_primary_path(&mut self) { - self.primary_path = false; - } - - /// For Initial packets, it is possible that the packet builder needs to amend the length. - pub fn add_padding(&mut self, padding: usize) { - debug_assert_eq!(self.pt, PacketType::Initial); - self.len += padding; - } - - /// Whether the packet has been declared lost. - pub fn lost(&self) -> bool { - self.time_declared_lost.is_some() - } - - /// Whether accounting for the loss or acknowledgement in the - /// congestion controller is pending. - /// Returns `true` if the packet counts as being "in flight", - /// and has not previously been declared lost. - /// Note that this should count packets that contain only ACK and PADDING, - /// but we don't send PADDING, so we don't track that. - pub fn cc_outstanding(&self) -> bool { - self.ack_eliciting() && self.on_primary_path() && !self.lost() - } - - /// Whether the packet should be tracked as in-flight. - pub fn cc_in_flight(&self) -> bool { - self.ack_eliciting() && self.on_primary_path() - } - - /// Declare the packet as lost. Returns `true` if this is the first time. - pub fn declare_lost(&mut self, now: Instant) -> bool { - if self.lost() { - false - } else { - self.time_declared_lost = Some(now); - true - } - } - - /// Ask whether this tracked packet has been declared lost for long enough - /// that it can be expired and no longer tracked. - pub fn expired(&self, now: Instant, expiration_period: Duration) -> bool { - self.time_declared_lost - .map_or(false, |loss_time| (loss_time + expiration_period) <= now) - } - - /// Whether the packet contents were cleared out after a PTO. - pub fn pto_fired(&self) -> bool { - self.pto - } - - /// On PTO, we need to get the recovery tokens so that we can ensure that - /// the frames we sent can be sent again in the PTO packet(s). Do that just once. - pub fn pto(&mut self) -> bool { - if self.pto || self.lost() { - false - } else { - self.pto = true; - true - } - } -} - -/// A collection for packets that we have sent that haven't been acknowledged. -#[derive(Debug, Default)] -pub struct SentPackets { - /// The collection. - packets: VecDeque>, - /// The packet number of the first item in the collection. - offset: PacketNumber, - /// The number of `Some` values in the packet. This is cached to keep things squeaky-fast. - len: usize, -} - -impl SentPackets { - pub fn len(&self) -> usize { - self.len - } - - pub fn track(&mut self, packet: SentPacket) { - if self.offset + PacketNumber::try_from(self.packets.len()).unwrap() != packet.pn { - assert_eq!( - self.len, 0, - "packet number skipping only supported for the first packet in a space" - ); - self.offset = packet.pn; - } - self.len += 1; - self.packets.push_back(Some(packet)); - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.packets.iter_mut().flatten() - } - - /// Take values from a specified range of packet numbers. - /// Note that this will not remove values unless the iterator is consumed. - /// The values returned will be reversed, so that the most recent packet appears first. - /// This is because ACK frames arrive with ranges starting from the largest acknowledged - /// and we want to match that. - pub fn take_range(&mut self, r: RangeInclusive, store: &mut Vec) { - let start = usize::try_from((*r.start()).saturating_sub(self.offset)).unwrap(); - let end = min( - usize::try_from((*r.end() + 1).saturating_sub(self.offset)).unwrap(), - self.packets.len(), - ); - - let before = store.len(); - if self.packets.range(..start).all(Option::is_none) { - // If there are extra empty slots, split those off too. - let extra = self - .packets - .range(end..) - .take_while(|&p| p.is_none()) - .count(); - self.offset += u64::try_from(end + extra).unwrap(); - let mut other = self.packets.split_off(end + extra); - std::mem::swap(&mut self.packets, &mut other); - store.extend( - other - .into_iter() - .rev() - .skip(extra) - .take(end - start) - .flatten(), - ); - } else { - store.extend( - self.packets - .range_mut(start..end) - .rev() - .filter_map(Option::take), - ); - } - self.len -= store.len() - before; - } - - /// Empty out the packets, but keep the offset. - pub fn drain_all(&mut self) -> impl Iterator { - self.len = 0; - self.offset += u64::try_from(self.packets.len()).unwrap(); - std::mem::take(&mut self.packets).into_iter().flatten() - } - - /// See `LossRecoverySpace::remove_old_lost` for details on `now` and `cd`. - pub fn remove_expired( - &mut self, - now: Instant, - cd: Duration, - ) -> impl Iterator { - let mut count = 0; - // Find the first unexpired packet and only keep from that one onwards. - for (i, p) in self.packets.iter().enumerate() { - if p.as_ref().map_or(false, |p| !p.expired(now, cd)) { - let mut other = self.packets.split_off(i); - self.len -= count; - self.offset += u64::try_from(i).unwrap(); - std::mem::swap(&mut self.packets, &mut other); - return other.into_iter().flatten(); - } - // Count `Some` values that we are removing. - count += usize::from(p.is_some()); - } - - self.len = 0; - self.offset += u64::try_from(self.packets.len()).unwrap(); - std::mem::take(&mut self.packets).into_iter().flatten() - } -} - -#[cfg(test)] -mod tests { - use std::{ - cell::OnceCell, - convert::TryFrom, - time::{Duration, Instant}, - }; - - use super::{SentPacket, SentPackets}; - use crate::packet::{PacketNumber, PacketType}; - - const PACKET_GAP: Duration = Duration::from_secs(1); - fn start_time() -> Instant { - thread_local!(static STARTING_TIME: OnceCell = OnceCell::new()); - STARTING_TIME.with(|t| *t.get_or_init(Instant::now)) - } - - fn pkt(n: u32) -> SentPacket { - SentPacket::new( - PacketType::Short, - PacketNumber::from(n), - start_time() + (PACKET_GAP * n), - true, - Vec::new(), - 100, - ) - } - - fn pkts() -> SentPackets { - let mut pkts = SentPackets::default(); - pkts.track(pkt(0)); - pkts.track(pkt(1)); - pkts.track(pkt(2)); - assert_eq!(pkts.len(), 3); - pkts - } - - trait HasPacketNumber { - fn pn(&self) -> PacketNumber; - } - impl HasPacketNumber for SentPacket { - fn pn(&self) -> PacketNumber { - self.pn - } - } - impl HasPacketNumber for &'_ SentPacket { - fn pn(&self) -> PacketNumber { - self.pn - } - } - impl HasPacketNumber for &'_ mut SentPacket { - fn pn(&self) -> PacketNumber { - self.pn - } - } - - fn remove_one(pkts: &mut SentPackets, idx: PacketNumber) { - assert_eq!(pkts.len(), 3); - let mut store = Vec::new(); - pkts.take_range(idx..=idx, &mut store); - let mut it = store.into_iter(); - assert_eq!(idx, it.next().unwrap().pn()); - assert!(it.next().is_none()); - std::mem::drop(it); - assert_eq!(pkts.len(), 2); - } - - fn assert_zero_and_two<'a, 'b: 'a>( - mut it: impl Iterator + 'a, - ) { - assert_eq!(it.next().unwrap().pn(), 0); - assert_eq!(it.next().unwrap().pn(), 2); - assert!(it.next().is_none()); - } - - #[test] - fn iterate_skipped() { - let mut pkts = pkts(); - for (i, p) in pkts.packets.iter().enumerate() { - assert_eq!(i, usize::try_from(p.as_ref().unwrap().pn).unwrap()); - } - remove_one(&mut pkts, 1); - - // Validate the merged result multiple ways. - assert_zero_and_two(pkts.iter_mut()); - - { - // Reverse the expectations here as this iterator reverses its output. - let mut store = Vec::new(); - pkts.take_range(0..=2, &mut store); - let mut it = store.into_iter(); - assert_eq!(it.next().unwrap().pn(), 2); - assert_eq!(it.next().unwrap().pn(), 0); - assert!(it.next().is_none()); - }; - - // The None values are still there in this case, so offset is 0. - assert_eq!(pkts.offset, 3); - assert_eq!(pkts.packets.len(), 0); - assert_eq!(pkts.len(), 0); - } - - #[test] - fn drain() { - let mut pkts = pkts(); - remove_one(&mut pkts, 1); - - assert_zero_and_two(pkts.drain_all()); - assert_eq!(pkts.offset, 3); - assert_eq!(pkts.len(), 0); - } - - #[test] - fn remove_expired() { - let mut pkts = pkts(); - remove_one(&mut pkts, 0); - - for p in pkts.iter_mut() { - p.declare_lost(p.time_sent); // just to keep things simple. - } - - // Expire up to pkt(1). - let mut it = pkts.remove_expired(start_time() + PACKET_GAP, Duration::new(0, 0)); - assert_eq!(it.next().unwrap().pn(), 1); - assert!(it.next().is_none()); - std::mem::drop(it); - - assert_eq!(pkts.offset, 2); - assert_eq!(pkts.len(), 1); - } - - #[test] - #[should_panic(expected = "packet number skipping only supported for the first packet")] - fn skipped_not_ok() { - let mut pkts = pkts(); - pkts.track(pkt(4)); - } - - #[test] - fn first_skipped_ok() { - let mut pkts = SentPackets::default(); - pkts.track(pkt(4)); // This is fine. - assert_eq!(pkts.offset, 4); - assert_eq!(pkts.len(), 1); - } -} diff --git a/neqo-transport/src/recovery/token.rs b/neqo-transport/src/recovery/token.rs deleted file mode 100644 index 93f84268c..000000000 --- a/neqo-transport/src/recovery/token.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::{ - ackrate::AckRate, - cid::ConnectionIdEntry, - crypto::CryptoRecoveryToken, - quic_datagrams::DatagramTracking, - send_stream::SendStreamRecoveryToken, - stream_id::{StreamId, StreamType}, - tracking::AckToken, -}; - -#[derive(Debug, Clone)] -#[allow(clippy::module_name_repetitions)] -pub enum StreamRecoveryToken { - Stream(SendStreamRecoveryToken), - ResetStream { - stream_id: StreamId, - }, - StopSending { - stream_id: StreamId, - }, - - MaxData(u64), - DataBlocked(u64), - - MaxStreamData { - stream_id: StreamId, - max_data: u64, - }, - StreamDataBlocked { - stream_id: StreamId, - limit: u64, - }, - - MaxStreams { - stream_type: StreamType, - max_streams: u64, - }, - StreamsBlocked { - stream_type: StreamType, - limit: u64, - }, -} - -#[derive(Debug, Clone)] -#[allow(clippy::module_name_repetitions)] -pub enum RecoveryToken { - Stream(StreamRecoveryToken), - Ack(AckToken), - Crypto(CryptoRecoveryToken), - HandshakeDone, - KeepAlive, // Special PING. - NewToken(usize), - NewConnectionId(ConnectionIdEntry<[u8; 16]>), - RetireConnectionId(u64), - AckFrequency(AckRate), - Datagram(DatagramTracking), -} diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index 0d3da70fa..3a5485153 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -18,8 +18,8 @@ use neqo_common::qlog::NeqoQlog; use crate::{ cc::{ClassicCongestionControl, CongestionControl, CongestionControlAlgorithm, Cubic, NewReno}, pace::Pacer, - recovery::SentPacket, rtt::RttEstimate, + tracking::SentPacket, }; /// The number of packets we allow to burst from the pacer. @@ -109,7 +109,7 @@ impl PacketSender { pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration) { self.pacer - .spend(pkt.time_sent(), rtt, self.cc.cwnd(), pkt.len()); + .spend(pkt.time_sent, rtt, self.cc.cwnd(), pkt.size); self.cc.on_packet_sent(pkt); } diff --git a/neqo-transport/src/tracking.rs b/neqo-transport/src/tracking.rs index 44a0bef4c..bdd0f250c 100644 --- a/neqo-transport/src/tracking.rs +++ b/neqo-transport/src/tracking.rs @@ -130,6 +130,114 @@ impl std::fmt::Debug for PacketNumberSpaceSet { } } +#[derive(Debug, Clone)] +pub struct SentPacket { + pub pt: PacketType, + pub pn: PacketNumber, + ack_eliciting: bool, + pub time_sent: Instant, + primary_path: bool, + pub tokens: Vec, + + time_declared_lost: Option, + /// After a PTO, this is true when the packet has been released. + pto: bool, + + pub size: usize, +} + +impl SentPacket { + pub fn new( + pt: PacketType, + pn: PacketNumber, + time_sent: Instant, + ack_eliciting: bool, + tokens: Vec, + size: usize, + ) -> Self { + Self { + pt, + pn, + time_sent, + ack_eliciting, + primary_path: true, + tokens, + time_declared_lost: None, + pto: false, + size, + } + } + + /// Returns `true` if the packet will elicit an ACK. + pub fn ack_eliciting(&self) -> bool { + self.ack_eliciting + } + + /// Returns `true` if the packet was sent on the primary path. + pub fn on_primary_path(&self) -> bool { + self.primary_path + } + + /// Clears the flag that had this packet on the primary path. + /// Used when migrating to clear out state. + pub fn clear_primary_path(&mut self) { + self.primary_path = false; + } + + /// Whether the packet has been declared lost. + pub fn lost(&self) -> bool { + self.time_declared_lost.is_some() + } + + /// Whether accounting for the loss or acknowledgement in the + /// congestion controller is pending. + /// Returns `true` if the packet counts as being "in flight", + /// and has not previously been declared lost. + /// Note that this should count packets that contain only ACK and PADDING, + /// but we don't send PADDING, so we don't track that. + pub fn cc_outstanding(&self) -> bool { + self.ack_eliciting() && self.on_primary_path() && !self.lost() + } + + /// Whether the packet should be tracked as in-flight. + pub fn cc_in_flight(&self) -> bool { + self.ack_eliciting() && self.on_primary_path() + } + + /// Declare the packet as lost. Returns `true` if this is the first time. + pub fn declare_lost(&mut self, now: Instant) -> bool { + if self.lost() { + false + } else { + self.time_declared_lost = Some(now); + true + } + } + + /// Ask whether this tracked packet has been declared lost for long enough + /// that it can be expired and no longer tracked. + pub fn expired(&self, now: Instant, expiration_period: Duration) -> bool { + self.time_declared_lost + .map_or(false, |loss_time| (loss_time + expiration_period) <= now) + } + + /// Whether the packet contents were cleared out after a PTO. + pub fn pto_fired(&self) -> bool { + self.pto + } + + /// On PTO, we need to get the recovery tokens so that we can ensure that + /// the frames we sent can be sent again in the PTO packet(s). Do that just once. + pub fn pto(&mut self) -> bool { + if self.pto || self.lost() { + false + } else { + self.pto = true; + true + } + } +} + impl std::fmt::Display for PacketNumberSpace { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.write_str(match self {