Skip to content

Commit

Permalink
Replace BytesMut transmit buffers with Vec
Browse files Browse the repository at this point in the history
We no longer need to share ownership of this memory, so we should use
a simpler type to reflect our simpler requirements.
  • Loading branch information
Ralith committed Apr 7, 2024
1 parent bca0728 commit 4e7281a
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 66 deletions.
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use thiserror::Error;
use tracing::{debug, trace};

Expand Down Expand Up @@ -127,7 +127,7 @@ impl DatagramState {
Ok(was_empty)
}

pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool {
pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl Connection {
&mut self,
now: Instant,
max_datagrams: usize,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
assert!(max_datagrams != 0);
let max_datagrams = match self.config.enable_segmentation_offload {
Expand Down Expand Up @@ -2952,7 +2952,7 @@ impl Connection {
&mut self,
now: Instant,
space_id: SpaceId,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
max_size: usize,
pn: u64,
) -> SentFrames {
Expand Down Expand Up @@ -3171,7 +3171,7 @@ impl Connection {
receiving_ecn: bool,
sent: &mut SentFrames,
space: &mut PacketSpace,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
stats: &mut ConnectionStats,
) {
debug_assert!(!space.pending_acks.ranges().is_empty());
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Instant;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use rand::Rng;
use tracing::{trace, trace_span};

Expand Down Expand Up @@ -36,7 +36,7 @@ impl PacketBuilder {
pub(super) fn new(
now: Instant,
space_id: SpaceId,
buffer: &mut BytesMut,
buffer: &mut Vec<u8>,
buffer_capacity: usize,
datagram_start: usize,
ack_eliciting: bool,
Expand Down Expand Up @@ -178,7 +178,7 @@ impl PacketBuilder {
now: Instant,
conn: &mut Connection,
sent: Option<SentFrames>,
buffer: &mut BytesMut,
buffer: &mut Vec<u8>,
) {
let ack_eliciting = self.ack_eliciting;
let exact_number = self.exact_number;
Expand Down Expand Up @@ -221,7 +221,7 @@ impl PacketBuilder {
}

/// Encrypt packet, returning the length of the packet and whether padding was added
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut BytesMut) -> (usize, bool) {
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
let pad = buffer.len() < self.min_size;
if pad {
trace!("PADDING * {}", self.min_size - buffer.len());
Expand Down
12 changes: 6 additions & 6 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
mem,
};

use bytes::{BufMut, BytesMut};
use bytes::BufMut;
use rustc_hash::FxHashMap;
use tracing::{debug, trace};

Expand Down Expand Up @@ -365,7 +365,7 @@ impl StreamsState {

pub(in crate::connection) fn write_control_frames(
&mut self,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
pending: &mut Retransmits,
retransmits: &mut ThinRetransmits,
stats: &mut FrameStats,
Expand Down Expand Up @@ -491,7 +491,7 @@ impl StreamsState {

pub(crate) fn write_stream_frames(
&mut self,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
max_buf_size: usize,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
Expand Down Expand Up @@ -925,7 +925,7 @@ mod tests {
connection::State as ConnState, connection::Streams, ReadableError, RecvStream, SendStream,
TransportErrorCode, WriteError,
};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;

fn make(side: Side) -> StreamsState {
StreamsState::new(
Expand Down Expand Up @@ -1317,7 +1317,7 @@ mod tests {
high.set_priority(1).unwrap();
high.write(b"high").unwrap();

let mut buf = BytesMut::with_capacity(40);
let mut buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
Expand Down Expand Up @@ -1376,7 +1376,7 @@ mod tests {
};
high.set_priority(-1).unwrap();

let mut buf = BytesMut::with_capacity(1000);
let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);
Expand Down
18 changes: 7 additions & 11 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Endpoint {
local_ip: Option<IpAddr>,
ecn: Option<EcnCodepoint>,
data: BytesMut,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<DatagramEvent> {
let datagram_len = data.len();
let (first_decode, remaining) = match PartialDecode::new(
Expand Down Expand Up @@ -298,7 +298,7 @@ impl Endpoint {
inciting_dgram_len: usize,
addresses: FourTuple,
dst_cid: &ConnectionId,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
if self
.last_stateless_reset
Expand Down Expand Up @@ -444,7 +444,7 @@ impl Endpoint {
packet: Packet,
rest: Option<BytesMut>,
crypto: Keys,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<DatagramEvent> {
if !packet.reserved_bits_valid() {
debug!("dropping connection attempt with invalid reserved bits");
Expand Down Expand Up @@ -505,7 +505,7 @@ impl Endpoint {
&mut self,
mut incoming: Incoming,
now: Instant,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Result<(ConnectionHandle, Connection), AcceptError> {
let packet_number = incoming.packet.header.number.expand(0);
let InitialHeader {
Expand Down Expand Up @@ -642,7 +642,7 @@ impl Endpoint {
}

/// Reject this incoming connection attempt
pub fn refuse(&mut self, incoming: Incoming, buf: &mut BytesMut) -> Transmit {
pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
self.initial_close(
incoming.packet.header.version,
incoming.addresses,
Expand All @@ -656,11 +656,7 @@ impl Endpoint {
/// Respond with a retry packet, requiring the client to retry with address validation
///
/// Errors if `incoming.remote_address_validated()` is true.
pub fn retry(
&mut self,
incoming: Incoming,
buf: &mut BytesMut,
) -> Result<Transmit, RetryError> {
pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
if incoming.remote_address_validated() {
return Err(RetryError(incoming));
}
Expand Down Expand Up @@ -766,7 +762,7 @@ impl Endpoint {
crypto: &Keys,
remote_id: &ConnectionId,
reason: TransportError,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Transmit {
// We don't need to worry about CID collisions in initial closes because the peer
// shouldn't respond, and if it does, and the CID collides, we'll just drop the
Expand Down
4 changes: 2 additions & 2 deletions quinn-proto/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
ops::{Range, RangeInclusive},
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes};
use tinyvec::TinyVec;

use crate::{
Expand Down Expand Up @@ -892,7 +892,7 @@ impl FrameStruct for Datagram {
}

impl Datagram {
pub(crate) fn encode(&self, length: bool, out: &mut BytesMut) {
pub(crate) fn encode(&self, length: bool, out: &mut Vec<u8>) {
out.write(Type(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte
if length {
// Safe to unwrap because we check length sanity before queueing datagrams
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub(crate) enum Header {
}

impl Header {
pub(crate) fn encode(&self, w: &mut BytesMut) -> PartialEncode {
pub(crate) fn encode(&self, w: &mut Vec<u8>) -> PartialEncode {
use self::Header::*;
let start = w.len();
match *self {
Expand Down Expand Up @@ -872,7 +872,7 @@ mod tests {

let dcid = ConnectionId::new(&hex!("06b858ec6f80452b"));
let client = initial_keys(Version::V1, &dcid, Side::Client);
let mut buf = BytesMut::new();
let mut buf = Vec::new();
let header = Header::Initial(InitialHeader {
number: PacketNumber::U8(0),
src_cid: ConnectionId::new(&[]),
Expand Down Expand Up @@ -903,7 +903,7 @@ mod tests {

let server = initial_keys(Version::V1, &dcid, Side::Server);
let supported_versions = DEFAULT_SUPPORTED_VERSIONS.to_vec();
let decode = PartialDecode::new(buf, 0, &supported_versions, false)
let decode = PartialDecode::new(buf.as_slice().into(), 0, &supported_versions, false)
.unwrap()
.0;
let mut packet = decode.finish(Some(&*server.header.remote)).unwrap();
Expand Down
10 changes: 5 additions & 5 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use assert_matches::assert_matches;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use hex_literal::hex;
use rand::RngCore;
use ring::hmac;
Expand All @@ -34,7 +34,7 @@ fn version_negotiate_server() {
None,
);
let now = Instant::now();
let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
let event = server.handle(
now,
client_addr,
Expand Down Expand Up @@ -76,7 +76,7 @@ fn version_negotiate_client() {
.connect(Instant::now(), client_config(), server_addr, "localhost")
.unwrap();
let now = Instant::now();
let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize);
let opt_event = client.handle(
now,
server_addr,
Expand Down Expand Up @@ -248,7 +248,7 @@ fn stateless_reset_limit() {
None,
);
let time = Instant::now();
let mut buf = BytesMut::new();
let mut buf = Vec::new();
let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf);
assert!(matches!(event, Some(DatagramEvent::Response(_))));
let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf);
Expand Down Expand Up @@ -1978,7 +1978,7 @@ fn malformed_token_len() {
true,
None,
);
let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
server.handle(
Instant::now(),
client_addr,
Expand Down
30 changes: 14 additions & 16 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl TestEndpoint {
}
}
let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize;
let mut buf = BytesMut::with_capacity(buffer_size);
let mut buf = Vec::with_capacity(buffer_size);

while self.inbound.front().map_or(false, |x| x.0 <= now) {
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
Expand Down Expand Up @@ -381,8 +381,8 @@ impl TestEndpoint {
}
DatagramEvent::Response(transmit) => {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(transmit, &buf[..size]));
buf.clear();
}
}
}
Expand All @@ -391,7 +391,7 @@ impl TestEndpoint {

pub(super) fn drive_outgoing(&mut self, now: Instant) {
let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize;
let mut buf = BytesMut::with_capacity(buffer_size);
let mut buf = Vec::with_capacity(buffer_size);

loop {
let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![];
Expand All @@ -412,8 +412,8 @@ impl TestEndpoint {
}
while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(transmit, &buf[..size]));
buf.clear();
}
self.timeout = conn.poll_timeout();
}
Expand Down Expand Up @@ -455,7 +455,7 @@ impl TestEndpoint {
incoming: Incoming,
now: Instant,
) -> Result<ConnectionHandle, ConnectionError> {
let mut buf = BytesMut::new();
let mut buf = Vec::new();
match self.endpoint.accept(incoming, now, &mut buf) {
Ok((ch, conn)) => {
self.connections.insert(ch, conn);
Expand All @@ -465,8 +465,7 @@ impl TestEndpoint {
Err(error) => {
if let Some(transmit) = error.response {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(transmit, &buf[..size]));
}
self.accepted = Some(Err(error.cause.clone()));
Err(error.cause)
Expand All @@ -475,19 +474,17 @@ impl TestEndpoint {
}

pub(super) fn retry(&mut self, incoming: Incoming) {
let mut buf = BytesMut::new();
let mut buf = Vec::new();
let transmit = self.endpoint.retry(incoming, &mut buf).unwrap();
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(transmit, &buf[..size]));
}

pub(super) fn reject(&mut self, incoming: Incoming) {
let mut buf = BytesMut::new();
let mut buf = Vec::new();
let transmit = self.endpoint.refuse(incoming, &mut buf);
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(transmit, &buf[..size]));
}

pub(super) fn assert_accept(&mut self) -> ConnectionHandle {
Expand Down Expand Up @@ -606,7 +603,8 @@ pub(super) fn min_opt<T: Ord>(x: Option<T>, y: Option<T>) -> Option<T> {
/// The maximum of datagrams TestEndpoint will produce via `poll_transmit`
const MAX_DATAGRAMS: usize = 10;

fn split_transmit(transmit: Transmit, mut buffer: Bytes) -> Vec<(Transmit, Bytes)> {
fn split_transmit(transmit: Transmit, buffer: &[u8]) -> Vec<(Transmit, Bytes)> {
let mut buffer = Bytes::copy_from_slice(buffer);
let segment_size = match transmit.segment_size {
Some(segment_size) => segment_size,
_ => return vec![(transmit, buffer)],
Expand Down
Loading

0 comments on commit 4e7281a

Please sign in to comment.