diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 0dd56f1c5..9f372ae14 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -455,7 +455,12 @@ impl Connection { /// `max_datagrams` specifies how many datagrams can be returned inside a /// single Transmit using GSO. This must be at least 1. #[must_use] - pub fn poll_transmit(&mut self, now: Instant, max_datagrams: usize) -> Option { + pub fn poll_transmit( + &mut self, + now: Instant, + max_datagrams: usize, + buf: &mut BytesMut, + ) -> Option { assert!(max_datagrams != 0); let max_datagrams = match self.config.enable_segmentation_offload { false => 1, @@ -477,13 +482,14 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - let mut buf = BytesMut::with_capacity(self.path.current_mtu() as usize); - let buf_capacity = self.path.current_mtu() as usize; + buf.reserve(self.path.current_mtu() as usize); + + let buf_capacity = buf.capacity(); let mut builder = PacketBuilder::new( now, SpaceId::Data, - &mut buf, + buf, buf_capacity, 0, false, @@ -501,13 +507,13 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, &mut buf); + builder.finish(self, buf); self.stats.udp_tx.datagrams += 1; self.stats.udp_tx.ios += 1; self.stats.udp_tx.bytes += buf.len() as u64; return Some(Transmit { destination, - contents: buf.freeze(), + size: buf.len(), ecn: None, segment_size: None, src_ip: self.local_ip, @@ -549,7 +555,6 @@ impl Connection { && self.peer_supports_ack_frequency(); } - let mut buf = BytesMut::new(); // Reserving capacity can provide more capacity than we asked for. // However we are not allowed to write more than MTU size. Therefore // the maximum capacity is tracked separately. @@ -664,7 +669,7 @@ impl Connection { // which will always send the maximum PDU. builder.pad_to(self.path.current_mtu()); - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + builder.finish_and_track(now, self, sent_frames.take(), buf); debug_assert_eq!(buf.len(), buf_capacity, "Packet must be padded"); } @@ -680,7 +685,7 @@ impl Connection { // (e.g. purely containing ACKs), modern memory allocators // (e.g. mimalloc and jemalloc) will pool certain allocation sizes // and therefore this is still rather efficient. - buf.reserve(max_datagrams * self.path.current_mtu() as usize - buf.capacity()); + buf.reserve(max_datagrams * self.path.current_mtu() as usize); } num_datagrams += 1; coalesce = true; @@ -690,7 +695,7 @@ impl Connection { // datagram. // Finish current packet without adding extra padding if let Some(builder) = builder.take() { - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + builder.finish_and_track(now, self, sent_frames.take(), buf); } } @@ -723,7 +728,7 @@ impl Connection { let builder = builder.get_or_insert(PacketBuilder::new( now, space_id, - &mut buf, + buf, buf_capacity, (num_datagrams - 1) * (self.path.current_mtu() as usize), ack_eliciting, @@ -748,7 +753,7 @@ impl Connection { self.receiving_ecn, &mut SentFrames::default(), &mut self.spaces[space_id], - &mut buf, + buf, &mut self.stats, ); } @@ -764,14 +769,14 @@ impl Connection { match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data { - reason.encode(&mut buf, builder.max_size) + reason.encode(buf, builder.max_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(&mut buf, builder.max_size) + .encode(buf, builder.max_size) } } State::Draining => frame::ConnectionClose { @@ -779,7 +784,7 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(&mut buf, builder.max_size), + .encode(buf, builder.max_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), @@ -794,7 +799,7 @@ impl Connection { let sent = self.populate_packet( now, space_id, - &mut buf, + buf, buf_capacity - builder.tag_len, builder.exact_number, ); @@ -831,7 +836,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames, buf); self.path .congestion .on_sent(now, buf.len() as u64, last_packet_number); @@ -857,7 +862,7 @@ impl Connection { let mut builder = PacketBuilder::new( now, space_id, - &mut buf, + buf, buf_capacity, 0, true, @@ -880,7 +885,7 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, Some(sent_frames), buf); self.stats.path.sent_plpmtud_probes += 1; num_datagrams = 1; @@ -901,7 +906,7 @@ impl Connection { Some(Transmit { destination: self.path.remote, - contents: buf.freeze(), + size: buf.len(), ecn: if self.path.sending_ecn { Some(EcnCodepoint::Ect0) } else { @@ -3415,6 +3420,13 @@ impl Connection { self.state = State::Drained; self.endpoint_events.push_back(EndpointEventInner::Drained); } + + /// Storage size required for the largest packet known to be supported by the current path + /// + /// Buffers passed to [`Connection::poll_transmit`] should be at least this large. + pub fn current_mtu(&self) -> u16 { + self.path.current_mtu() + } } impl fmt::Debug for Connection { diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index 2cc0677b3..3d61881d1 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -127,6 +127,7 @@ impl Endpoint { local_ip: Option, ecn: Option, data: BytesMut, + buf: &mut BytesMut, ) -> Option { let datagram_len = data.len(); let (first_decode, remaining) = match PartialDecode::new( @@ -147,13 +148,12 @@ impl Endpoint { } trace!("sending version negotiation"); // Negotiate versions - let mut buf = BytesMut::new(); Header::VersionNegotiate { random: self.rng.gen::() | 0x40, src_cid: dst_cid, dst_cid: src_cid, } - .encode(&mut buf); + .encode(buf); // Grease with a reserved version if version != 0x0a1a_2a3a { buf.write::(0x0a1a_2a3a); @@ -166,7 +166,7 @@ impl Endpoint { return Some(DatagramEvent::Response(Transmit { destination: remote, ecn: None, - contents: buf.freeze(), + size: buf.len(), segment_size: None, src_ip: local_ip, })); @@ -205,7 +205,7 @@ impl Endpoint { None => { debug!("packet for unrecognized connection {}", dst_cid); return self - .stateless_reset(datagram_len, addresses, dst_cid) + .stateless_reset(datagram_len, addresses, dst_cid, buf) .map(DatagramEvent::Response); } }; @@ -233,7 +233,7 @@ impl Endpoint { }; return match first_decode.finish(Some(&*crypto.header.remote)) { Ok(packet) => { - self.handle_first_packet(now, addresses, ecn, packet, remaining, &crypto) + self.handle_first_packet(now, addresses, ecn, packet, remaining, &crypto, buf) } Err(e) => { trace!("unable to decode initial packet: {}", e); @@ -254,7 +254,7 @@ impl Endpoint { // if !dst_cid.is_empty() { return self - .stateless_reset(datagram_len, addresses, dst_cid) + .stateless_reset(datagram_len, addresses, dst_cid, buf) .map(DatagramEvent::Response); } @@ -267,6 +267,7 @@ impl Endpoint { inciting_dgram_len: usize, addresses: FourTuple, dst_cid: &ConnectionId, + buf: &mut BytesMut, ) -> Option { /// Minimum amount of padding for the stateless reset to look like a short-header packet const MIN_PADDING_LEN: usize = 5; @@ -285,7 +286,6 @@ impl Endpoint { "sending stateless reset for {} to {}", dst_cid, addresses.remote ); - let mut buf = BytesMut::new(); // Resets with at least this much padding can't possibly be distinguished from real packets const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE; let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN { @@ -304,7 +304,7 @@ impl Endpoint { Some(Transmit { destination: addresses.remote, ecn: None, - contents: buf.freeze(), + size: buf.len(), segment_size: None, src_ip: addresses.local_ip, }) @@ -404,6 +404,7 @@ impl Endpoint { mut packet: Packet, rest: Option, crypto: &Keys, + buf: &mut BytesMut, ) -> Option { let (src_cid, dst_cid, token, packet_number, version) = match packet.header { Header::Initial { @@ -444,6 +445,7 @@ impl Endpoint { crypto, &src_cid, TransportError::CONNECTION_REFUSED(""), + buf, ))); } @@ -460,6 +462,7 @@ impl Endpoint { crypto, &src_cid, TransportError::PROTOCOL_VIOLATION("invalid destination CID length"), + buf, ))); } @@ -488,16 +491,15 @@ impl Endpoint { version, }; - let mut buf = BytesMut::new(); - let encode = header.encode(&mut buf); + let encode = header.encode(buf); buf.put_slice(&token); - buf.extend_from_slice(&server_config.crypto.retry_tag(version, &dst_cid, &buf)); - encode.finish(&mut buf, &*crypto.header.local, None); + buf.extend_from_slice(&server_config.crypto.retry_tag(version, &dst_cid, buf)); + encode.finish(buf, &*crypto.header.local, None); return Some(DatagramEvent::Response(Transmit { destination: addresses.remote, ecn: None, - contents: buf.freeze(), + size: buf.len(), segment_size: None, src_ip: addresses.local_ip, })); @@ -522,6 +524,7 @@ impl Endpoint { crypto, &src_cid, TransportError::INVALID_TOKEN(""), + buf, ))); } } @@ -569,7 +572,7 @@ impl Endpoint { self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained)); match e { ConnectionError::TransportError(e) => Some(DatagramEvent::Response( - self.initial_close(version, addresses, crypto, &src_cid, e), + self.initial_close(version, addresses, crypto, &src_cid, e, buf), )), _ => None, } @@ -630,6 +633,7 @@ impl Endpoint { crypto: &Keys, remote_id: &ConnectionId, reason: TransportError, + buf: &mut BytesMut, ) -> Transmit { // We don't need to worry about CID collisions in initial closes because the peer // shouldn't respond, and if it does, and the CID collides, we'll just drop the @@ -644,21 +648,16 @@ impl Endpoint { version, }; - let mut buf = BytesMut::new(); - let partial_encode = header.encode(&mut buf); + let partial_encode = header.encode(buf); let max_len = INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len(); - frame::Close::from(reason).encode(&mut buf, max_len); + frame::Close::from(reason).encode(buf, max_len); buf.resize(buf.len() + crypto.packet.local.tag_len(), 0); - partial_encode.finish( - &mut buf, - &*crypto.header.local, - Some((0, &*crypto.packet.local)), - ); + partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local))); Transmit { destination: addresses.remote, ecn: None, - contents: buf.freeze(), + size: buf.len(), segment_size: None, src_ip: addresses.local_ip, } diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index e3eb51d47..d271390f0 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -38,7 +38,6 @@ mod tests; pub mod transport_parameters; mod varint; -use bytes::Bytes; pub use varint::{VarInt, VarIntBoundsExceeded}; mod connection; @@ -281,8 +280,8 @@ pub struct Transmit { pub destination: SocketAddr, /// Explicit congestion notification bits to set on the packet pub ecn: Option, - /// Contents of the datagram - pub contents: Bytes, + /// Amount of data written to the caller-supplied buffer + pub size: usize, /// The segment size if this transmission contains multiple datagrams. /// This is `None` if the transmit only contains a single datagram pub segment_size: Option, diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 583466331..c41f3592d 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use assert_matches::assert_matches; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use hex_literal::hex; use rand::RngCore; use ring::hmac; @@ -34,6 +34,7 @@ fn version_negotiate_server() { None, ); let now = Instant::now(); + let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); let event = server.handle( now, client_addr, @@ -41,14 +42,15 @@ fn version_negotiate_server() { None, // Long-header packet with reserved version number hex!("80 0a1a2a3a 04 00000000 04 00000000 00")[..].into(), + &mut buf, ); - let Some(DatagramEvent::Response(Transmit { contents, .. })) = event else { + let Some(DatagramEvent::Response(Transmit { .. })) = event else { panic!("expected a response"); }; - assert_ne!(contents[0] & 0x80, 0); - assert_eq!(&contents[1..15], hex!("00000000 04 00000000 04 00000000")); - assert!(contents[15..].chunks(4).any(|x| { + assert_ne!(buf[0] & 0x80, 0); + assert_eq!(&buf[1..15], hex!("00000000 04 00000000 04 00000000")); + assert!(buf[15..].chunks(4).any(|x| { DEFAULT_SUPPORTED_VERSIONS.contains(&u32::from_be_bytes(x.try_into().unwrap())) })); } @@ -74,6 +76,7 @@ fn version_negotiate_client() { .connect(Instant::now(), client_config(), server_addr, "localhost") .unwrap(); let now = Instant::now(); + let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize); let opt_event = client.handle( now, server_addr, @@ -85,6 +88,7 @@ fn version_negotiate_client() { 0a1a2a3a" )[..] .into(), + &mut buf, ); if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event { client_ch.handle_event(event); @@ -1937,12 +1941,14 @@ fn malformed_token_len() { true, None, ); + let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); server.handle( Instant::now(), client_addr, None, None, hex!("8900 0000 0101 0000 1b1b 841b 0000 0000 3f00")[..].into(), + &mut buf, ); } diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 688b2794a..cd7bb1123 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -136,28 +136,26 @@ impl Pair { let span = info_span!("client"); let _guard = span.enter(); self.client.drive(self.time, self.server.addr); - for x in self.client.outbound.drain(..) { - if packet_size(&x) > self.mtu { - info!( - packet_size = packet_size(&x), - "dropping packet (max size exceeded)" - ); + for (packet, buffer) in self.client.outbound.drain(..) { + let packet_size = packet_size(&packet, &buffer); + if packet_size > self.mtu { + info!(packet_size, "dropping packet (max size exceeded)"); continue; } - if x.contents[0] & packet::LONG_HEADER_FORM == 0 { - let spin = x.contents[0] & packet::SPIN_BIT != 0; + if buffer[0] & packet::LONG_HEADER_FORM == 0 { + let spin = buffer[0] & packet::SPIN_BIT != 0; self.spins += (spin == self.last_spin) as u64; self.last_spin = spin; } if let Some(ref socket) = self.client.socket { - socket.send_to(&x.contents, x.destination).unwrap(); + socket.send_to(&buffer, packet.destination).unwrap(); } - if self.server.addr == x.destination { - let ecn = set_congestion_experienced(x.ecn, self.congestion_experienced); + if self.server.addr == packet.destination { + let ecn = set_congestion_experienced(packet.ecn, self.congestion_experienced); self.server.inbound.push_back(( self.time + self.latency, ecn, - x.contents.as_ref().into(), + buffer.as_ref().into(), )); } } @@ -167,23 +165,21 @@ impl Pair { let span = info_span!("server"); let _guard = span.enter(); self.server.drive(self.time, self.client.addr); - for x in self.server.outbound.drain(..) { - if packet_size(&x) > self.mtu { - info!( - packet_size = packet_size(&x), - "dropping packet (max size exceeded)" - ); + for (packet, buffer) in self.server.outbound.drain(..) { + let packet_size = packet_size(&packet, &buffer); + if packet_size > self.mtu { + info!(packet_size, "dropping packet (max size exceeded)"); continue; } if let Some(ref socket) = self.server.socket { - socket.send_to(&x.contents, x.destination).unwrap(); + socket.send_to(&buffer, packet.destination).unwrap(); } - if self.client.addr == x.destination { - let ecn = set_congestion_experienced(x.ecn, self.congestion_experienced); + if self.client.addr == packet.destination { + let ecn = set_congestion_experienced(packet.ecn, self.congestion_experienced); self.client.inbound.push_back(( self.time + self.latency, ecn, - x.contents.as_ref().into(), + buffer.as_ref().into(), )); } } @@ -288,8 +284,8 @@ pub(super) struct TestEndpoint { pub(super) addr: SocketAddr, socket: Option, timeout: Option, - pub(super) outbound: VecDeque, - delayed: VecDeque, + pub(super) outbound: VecDeque<(Transmit, Bytes)>, + delayed: VecDeque<(Transmit, Bytes)>, pub(super) inbound: VecDeque<(Instant, Option, BytesMut)>, accepted: Option, pub(super) connections: HashMap, @@ -334,10 +330,15 @@ impl TestEndpoint { } } } + let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize; + let mut buf = BytesMut::with_capacity(buffer_size); while self.inbound.front().map_or(false, |x| x.0 <= now) { let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap(); - if let Some(event) = self.endpoint.handle(recv_time, remote, None, ecn, packet) { + if let Some(event) = self + .endpoint + .handle(recv_time, remote, None, ecn, packet, &mut buf) + { match event { DatagramEvent::NewConnection(ch, conn) => { self.connections.insert(ch, conn); @@ -352,7 +353,9 @@ impl TestEndpoint { self.conn_events.entry(ch).or_default().push_back(event); } DatagramEvent::Response(transmit) => { - self.outbound.extend(split_transmit(transmit)); + let size = transmit.size; + self.outbound + .extend(split_transmit(transmit, buf.split_to(size).freeze())); } } } @@ -375,9 +378,10 @@ impl TestEndpoint { while let Some(event) = conn.poll_endpoint_events() { endpoint_events.push((*ch, event)); } - - while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) { - self.outbound.extend(split_transmit(x)); + while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) { + let size = transmit.size; + self.outbound + .extend(split_transmit(transmit, buf.split_to(size).freeze())); } self.timeout = conn.poll_timeout(); } @@ -520,35 +524,38 @@ pub(super) fn min_opt(x: Option, y: Option) -> Option { /// The maximum of datagrams TestEndpoint will produce via `poll_transmit` const MAX_DATAGRAMS: usize = 10; -fn split_transmit(mut transmit: Transmit) -> Vec { +fn split_transmit(transmit: Transmit, mut buffer: Bytes) -> Vec<(Transmit, Bytes)> { let segment_size = match transmit.segment_size { Some(segment_size) => segment_size, - _ => return vec![transmit], + _ => return vec![(transmit, buffer)], }; let mut transmits = Vec::new(); - while !transmit.contents.is_empty() { - let end = segment_size.min(transmit.contents.len()); - - let contents = transmit.contents.split_to(end); - transmits.push(Transmit { - destination: transmit.destination, - ecn: transmit.ecn, + while !buffer.is_empty() { + let end = segment_size.min(buffer.len()); + + let contents = buffer.split_to(end); + transmits.push(( + Transmit { + destination: transmit.destination, + size: buffer.len(), + ecn: transmit.ecn, + segment_size: None, + src_ip: transmit.src_ip, + }, contents, - segment_size: None, - src_ip: transmit.src_ip, - }); + )); } transmits } -fn packet_size(transmit: &Transmit) -> usize { +fn packet_size(transmit: &Transmit, buffer: &Bytes) -> usize { if transmit.segment_size.is_some() { panic!("This transmit is meant to be split into multiple packets!"); } - transmit.contents.len() + buffer.len() } fn set_congestion_experienced( diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 848f04618..ef6f0dba7 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -10,7 +10,7 @@ use std::{ }; use crate::runtime::{AsyncTimer, AsyncUdpSocket, Runtime}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use pin_project_lite::pin_project; use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId}; use rustc_hash::FxHashMap; @@ -857,16 +857,20 @@ impl State { let mut transmits = 0; let max_datagrams = self.socket.max_transmit_segments(); + let capacity = self.inner.current_mtu(); + let mut buffer = BytesMut::with_capacity(capacity as usize); - while let Some(t) = self.inner.poll_transmit(now, max_datagrams) { + while let Some(t) = self.inner.poll_transmit(now, max_datagrams, &mut buffer) { transmits += match t.segment_size { None => 1, - Some(s) => (t.contents.len() + s - 1) / s, // round up + Some(s) => (t.size + s - 1) / s, // round up }; // If the endpoint driver is gone, noop. - let _ = self - .endpoint_events - .send((self.handle, EndpointEvent::Transmit(t))); + let size = t.size; + let _ = self.endpoint_events.send(( + self.handle, + EndpointEvent::Transmit(t, buffer.split_to(size).freeze()), + )); if transmits >= MAX_TRANSMIT_DATAGRAMS { // TODO: What isn't ideal here yet is that if we don't poll all diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 38dee2a9c..3a6260bee 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -408,6 +408,8 @@ impl State { .write(IoSliceMut::<'a>::new(buf)); }); let mut iovs = unsafe { iovs.assume_init() }; + let buffer_size = self.inner.config().get_max_udp_payload_size() as usize; + let mut buffer = BytesMut::with_capacity(buffer_size); loop { match self.socket.poll_recv(cx, &mut iovs, &mut metas) { Poll::Ready(Ok(msgs)) => { @@ -422,6 +424,7 @@ impl State { meta.dst_ip, meta.ecn.map(proto_ecn), buf, + &mut buffer, ) { Some(DatagramEvent::NewConnection(handle, conn)) => { let conn = self.connections.insert( @@ -441,7 +444,7 @@ impl State { .unwrap() .send(ConnectionEvent::Proto(event)); } - Some(DatagramEvent::Response(t)) => { + Some(DatagramEvent::Response(transmit)) => { // Limiting the memory usage for items queued in the outgoing queue from endpoint // generated packets. Otherwise, we may see a build-up of the queue under test with // flood of initial packets against the endpoint. The sender with the sender-limiter @@ -449,8 +452,11 @@ impl State { if self.transmit_queue_contents_len < MAX_TRANSMIT_QUEUE_CONTENTS_LEN { - let contents_len = t.contents.len(); - self.outgoing.push_back(udp_transmit(t)); + let contents_len = transmit.size; + self.outgoing.push_back(udp_transmit( + transmit, + buffer.split_to(contents_len).freeze(), + )); self.transmit_queue_contents_len = self .transmit_queue_contents_len .saturating_add(contents_len); @@ -521,7 +527,6 @@ impl State { fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool { use EndpointEvent::*; - for _ in 0..IO_LOOP_BOUND { match self.events.poll_recv(cx) { Poll::Ready(Some((ch, event))) => match event { @@ -542,9 +547,9 @@ impl State { .send(ConnectionEvent::Proto(event)); } } - Transmit(t) => { - let contents_len = t.contents.len(); - self.outgoing.push_back(udp_transmit(t)); + Transmit(t, buf) => { + let contents_len = buf.len(); + self.outgoing.push_back(udp_transmit(t, buf)); self.transmit_queue_contents_len = self .transmit_queue_contents_len .saturating_add(contents_len); @@ -562,11 +567,11 @@ impl State { } #[inline] -fn udp_transmit(t: proto::Transmit) -> udp::Transmit { +fn udp_transmit(t: proto::Transmit, buffer: Bytes) -> udp::Transmit { udp::Transmit { destination: t.destination, ecn: t.ecn.map(udp_ecn), - contents: t.contents, + contents: buffer, segment_size: t.segment_size, src_ip: t.src_ip, } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 76b08c054..05221548a 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -60,6 +60,7 @@ mod runtime; mod send_stream; mod work_limiter; +use bytes::Bytes; pub use proto::{ congestion, crypto, AckFrequencyConfig, ApplicationClose, Chunk, ClientConfig, ConfigError, ConnectError, ConnectionClose, ConnectionError, EndpointConfig, IdleTimeout, @@ -98,7 +99,7 @@ enum ConnectionEvent { #[derive(Debug)] enum EndpointEvent { Proto(proto::EndpointEvent), - Transmit(proto::Transmit), + Transmit(proto::Transmit, Bytes), } /// Maximum number of datagrams processed in send/recv calls to make before moving on to other processing