Skip to content

Commit

Permalink
perf: don't allocate in UDP recv path
Browse files Browse the repository at this point in the history
Previously `neqo-udp` would have one long-lived receive buffer, but after
reading into the buffer from the socket, it would allocate a new `Vec` for each
UDP segment.

This commit does not allocate each UDP segment in a new `Vec`, but instead
passes the single re-used receive buffer to
`neqo_transport::Connection::process_input` directly.

Part of mozilla#1693.
  • Loading branch information
mxinden committed Aug 27, 2024
1 parent 910a7cd commit 596f477
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 126 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,6 @@ jobs:

bench:
name: "Benchmark"
needs: [check]
# TODO
# needs: [check]
uses: ./.github/workflows/bench.yml
27 changes: 22 additions & 5 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ struct Runner<'a, H: Handler> {
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
args: &'a Args,
recv_buf: Option<Vec<u8>>,
}

impl<'a, H: Handler> Runner<'a, H> {
Expand Down Expand Up @@ -445,12 +446,26 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
// TODO: big hack
let dgram = match self.socket.recv(
&self.local_addr,
self.recv_buf.take().expect("recv_buf not to be taken"),
) {
Ok(Ok(d)) => d,
Ok(Err(recv_buf)) => {
self.recv_buf = Some(recv_buf);
break;
}
Err((e, recv_buf)) => {
self.recv_buf = Some(recv_buf);
return Err(e.into());
}
};

self.client
.process_multiple_input(dgrams.iter(), Instant::now());
// TODO
.process_multiple_input(std::iter::once(&dgram), Instant::now());
self.recv_buf = Some(dgram.into_recv_buf());
self.process_output().await?;
}

Expand Down Expand Up @@ -568,6 +583,7 @@ pub async fn client(mut args: Args) -> Res<()> {
local_addr: real_local,
socket: &mut socket,
timeout: None,
recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)),
}
.run()
.await?
Expand All @@ -584,6 +600,7 @@ pub async fn client(mut args: Args) -> Res<()> {
local_addr: real_local,
socket: &mut socket,
timeout: None,
recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)),
}
.run()
.await?
Expand Down
26 changes: 19 additions & 7 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ pub struct ServerRunner {
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, crate::udp::Socket)>,
recv_buf: Option<Vec<u8>>,
}

impl ServerRunner {
Expand All @@ -219,6 +220,7 @@ impl ServerRunner {
server,
timeout: None,
sockets,
recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)),
}
}

Expand Down Expand Up @@ -289,13 +291,23 @@ impl ServerRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
break;
}
for dgram in dgrams {
self.process(Some(&dgram)).await?;
}
// TODO: big hack
let dgram = match socket.recv(
host,
self.recv_buf.take().expect("recv_buf not to be taken"),
) {
Ok(Ok(d)) => d,
Ok(Err(recv_buf)) => {
self.recv_buf = Some(recv_buf);
break;
}
Err((e, recv_buf)) => {
self.recv_buf = Some(recv_buf);
return Err(e.into());
}
};
self.process(Some(&dgram)).await?;
self.recv_buf = Some(dgram.into_recv_buf());
},
Ready::Timeout => {
self.timeout = None;
Expand Down
31 changes: 25 additions & 6 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,35 @@ impl Socket {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with
/// the provided local address.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
pub fn recv(
&self,
local_address: &SocketAddr,
recv_buf: Vec<u8>,
) -> Result<Result<Datagram, Vec<u8>>, (io::Error, Vec<u8>)> {
// TODO: big hack!
let mut recv_buf = Some(recv_buf);
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner(local_address, &self.state, &self.inner)
})
.try_io(
tokio::io::Interest::READABLE,
|| match neqo_udp::recv_inner_2(
local_address,
&self.state,
&self.inner,
recv_buf.take().unwrap(),
) {
Ok(d) => return Ok(d),
Err((e, buf)) => {
recv_buf = Some(buf);
return Err(e);
}
},
)
.map(Ok)
.or_else(|e| {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(vec![])
Ok(Err(recv_buf.take().unwrap()))
} else {
Err(e)
Err((e, recv_buf.take().unwrap()))
}
})
}
Expand Down
28 changes: 28 additions & 0 deletions neqo-common/src/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Datagram {
src: SocketAddr,
dst: SocketAddr,
tos: IpTos,
/// The segment size if this transmission contains multiple datagrams.
/// This is `None` if the [`Datagram`] only contains a single datagram
segment_size: Option<usize>,
d: Vec<u8>,
}

Expand All @@ -22,6 +25,23 @@ impl Datagram {
src,
dst,
tos,
segment_size: None,
d: d.into(),
}
}

pub fn new_with_segment_size<V: Into<Vec<u8>>>(
src: SocketAddr,
dst: SocketAddr,
tos: IpTos,
segment_size: usize,
d: V,
) -> Self {
Self {
src,
dst,
tos,
segment_size: Some(segment_size),
d: d.into(),
}
}
Expand All @@ -44,6 +64,14 @@ impl Datagram {
pub fn set_tos(&mut self, tos: IpTos) {
self.tos = tos;
}

pub fn into_recv_buf(self) -> Vec<u8> {
self.d
}

pub fn segment_size(&self) -> Option<usize> {
self.segment_size
}
}

impl Deref for Datagram {
Expand Down
6 changes: 1 addition & 5 deletions neqo-crypto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,7 @@ fn setup_standalone(nss: &str) -> Vec<String> {
"cargo:rustc-link-search=native={}",
nsslibdir.to_str().unwrap()
);
if is_debug() || env::consts::OS == "windows" {
static_link();
} else {
dynamic_link();
}
static_link();

let mut flags: Vec<String> = Vec::new();
for i in includes {
Expand Down
Loading

0 comments on commit 596f477

Please sign in to comment.