diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index e3d7a6cf18..1e5463d6ca 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -29,7 +29,7 @@ use udp::{RecvMeta, BATCH_SIZE}; use crate::{ connection::Connecting, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig, VarInt, - IO_LOOP_BOUND, MAX_TRANSMIT_QUEUE_CONTENTS_LEN, RECV_TIME_BOUND, SEND_TIME_BOUND, + IO_LOOP_BOUND, RECV_TIME_BOUND, }; /// A QUIC endpoint. @@ -216,7 +216,6 @@ impl Endpoint { let socket = self.runtime.wrap_udp_socket(socket)?; let mut inner = self.inner.state.lock().unwrap(); inner.socket = socket; - inner.io_poller = inner.socket.clone().create_io_poller(); inner.ipv6 = addr.is_ipv6(); // Update connection socket references @@ -332,7 +331,6 @@ impl Future for EndpointDriver { let mut keep_going = false; keep_going |= endpoint.drive_recv(cx, now)?; keep_going |= endpoint.handle_events(cx, &self.0.shared); - keep_going |= endpoint.drive_send(cx)?; if !endpoint.incoming.is_empty() { self.0.shared.incoming.notify_waiters(); @@ -374,7 +372,6 @@ pub(crate) struct EndpointInner { pub(crate) struct State { socket: Arc, inner: proto::Endpoint, - outgoing: VecDeque, incoming: VecDeque, driver: Option, ipv6: bool, @@ -385,10 +382,7 @@ pub(crate) struct State { driver_lost: bool, recv_limiter: WorkLimiter, recv_buf: Box<[u8]>, - send_limiter: WorkLimiter, runtime: Arc, - /// The aggregateed contents length of the packets in the transmit queue - transmit_queue_contents_len: usize, } #[derive(Debug)] @@ -449,22 +443,31 @@ impl State { .send(ConnectionEvent::Proto(event)); } Some(DatagramEvent::Response(transmit)) => { - // Limiting the memory usage for items queued in the outgoing queue from endpoint - // generated packets. Otherwise, we may see a build-up of the queue under test with - // flood of initial packets against the endpoint. The sender with the sender-limiter - // may not keep up the pace of these packets queued into the queue. - if self.transmit_queue_contents_len - < MAX_TRANSMIT_QUEUE_CONTENTS_LEN - { - let contents_len = transmit.size; - self.outgoing.push_back(udp_transmit( - transmit, - buffer.split_to(contents_len).freeze(), - )); - self.transmit_queue_contents_len = self - .transmit_queue_contents_len - .saturating_add(contents_len); - } + // Send if there's kernel buffer space; otherwise, drop it + // + // As an endpoint-generated packet, we know this is an + // immediate, stateless response to an unconnected peer, + // one of: + // + // - A version negotiation response due to an unknown version + // - A `CLOSE` due to a malformed or unwanted connection attempt + // - A stateless reset due to an unrecognized connection + // - A `Retry` packet due to a connection attempt when + // `use_retry` is set + // + // In each case, a well-behaved peer can be trusted to retry a + // few times, which is guaranteed to produce the same response + // from us. Repeated failures might at worst cause a peer's new + // connection attempt to time out, which is acceptable if we're + // under such heavy load that there's never room for this code + // to transmit. This is morally equivalent to the packet getting + // lost due to congestion further along the link, which + // similarly relies on peer retries for recovery. + let contents_len = transmit.size; + _ = self.socket.try_send(&[udp_transmit( + transmit, + buffer.split_to(contents_len).freeze(), + )]); } None => {} } @@ -493,46 +496,6 @@ impl State { Ok(false) } - fn drive_send(&mut self, cx: &mut Context) -> Result { - self.send_limiter.start_cycle(); - - let result = loop { - if self.outgoing.is_empty() { - break Ok(false); - } - - if !self.send_limiter.allow_work() { - break Ok(true); - } - - if let Poll::Pending = self.io_poller.as_mut().poll_writable(cx)? { - break Ok(false); - } - - match self.socket.try_send(self.outgoing.as_slices().0) { - Ok(n) => { - let contents_len: usize = - self.outgoing.drain(..n).map(|t| t.contents.len()).sum(); - self.transmit_queue_contents_len = self - .transmit_queue_contents_len - .saturating_sub(contents_len); - // We count transmits instead of `poll_send` calls since the cost - // of a `sendmmsg` still linearly increases with number of packets. - self.send_limiter.record_work(n); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - break Err(e); - } - } - }; - - self.send_limiter.finish_cycle(); - result - } - fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool { for _ in 0..IO_LOOP_BOUND { let (ch, event) = match self.events.poll_recv(cx) { @@ -678,7 +641,6 @@ impl EndpointRef { inner, ipv6, events, - outgoing: VecDeque::new(), incoming: VecDeque::new(), driver: None, connections: ConnectionSet { @@ -690,9 +652,7 @@ impl EndpointRef { driver_lost: false, recv_buf: recv_buf.into(), recv_limiter: WorkLimiter::new(RECV_TIME_BOUND), - send_limiter: WorkLimiter::new(SEND_TIME_BOUND), runtime, - transmit_queue_contents_len: 0, }), })) } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 2b0ae574cd..bb1bbfe843 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -110,14 +110,6 @@ const IO_LOOP_BOUND: usize = 160; /// batch of size 32 was observed to take 30us on some systems. const RECV_TIME_BOUND: Duration = Duration::from_micros(50); -/// The maximum amount of time that should be spent in `sendmsg()` calls per endpoint iteration -const SEND_TIME_BOUND: Duration = Duration::from_micros(50); - -/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets -/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded. -/// Chose to represent 100 MB of data. -const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000; - fn udp_transmit(t: proto::Transmit, buffer: Bytes) -> udp::Transmit { udp::Transmit { destination: t.destination,