From bdc183bcf8665978fc90ca7b0dcf124b282ecd0a Mon Sep 17 00:00:00 2001 From: Kershaw Chang Date: Fri, 22 Dec 2023 16:06:53 +0800 Subject: [PATCH 1/2] Enhance socket read to handle multiple packets --- neqo-client/Cargo.toml | 1 + neqo-client/src/main.rs | 181 +++++++++++++++++++++++++--------------- 2 files changed, 116 insertions(+), 66 deletions(-) diff --git a/neqo-client/Cargo.toml b/neqo-client/Cargo.toml index 06b131a6a2..4d4f1b67db 100644 --- a/neqo-client/Cargo.toml +++ b/neqo-client/Cargo.toml @@ -17,6 +17,7 @@ neqo-qpack = { path = "./../neqo-qpack" } structopt = "0.3.7" url = "2.0" qlog = "0.10.0" +mio = "0.6.17" [features] deny-warnings = [] diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index a72d78890a..e7af2a3779 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -9,6 +9,8 @@ use qlog::{events::EventImportance, streamer::QlogStreamer}; +use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token}; + use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, @@ -30,7 +32,7 @@ use std::{ fmt::{self, Display}, fs::{create_dir_all, File, OpenOptions}, io::{self, ErrorKind, Write}, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, path::PathBuf, process::exit, rc::Rc, @@ -338,8 +340,8 @@ impl QuicParameters { } } -fn emit_datagram(socket: &UdpSocket, d: Datagram) -> io::Result<()> { - let sent = socket.send_to(&d[..], d.destination())?; +fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> { + let sent = socket.send_to(&d[..], &d.destination())?; if sent != d.len() { eprintln!("Unable to send all {} bytes of datagram", d.len()); } @@ -391,36 +393,71 @@ fn get_output_file( fn process_loop( local_addr: &SocketAddr, socket: &UdpSocket, + poll: &Poll, client: &mut Http3Client, handler: &mut Handler, ) -> Res { let buf = &mut [0u8; 2048]; + let mut events = Events::with_capacity(1024); + let mut timeout: Option = None; loop { + poll.poll( + &mut events, + if timeout.is_some() { + timeout + } else { + Some(Duration::from_millis(0)) + }, + )?; + + 'read: loop { + match socket.recv_from(&mut buf[..]) { + Err(ref err) => { + if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted { + break 'read; + } + + eprintln!("UDP error: {}", err); + exit(1) + } + Ok((sz, remote)) => { + if sz == buf.len() { + eprintln!("Received more than {} bytes", buf.len()); + break 'read; + } + if sz > 0 { + let d = Datagram::new(remote, *local_addr, &buf[..sz]); + client.process_input(d, Instant::now()); + handler.maybe_key_update(client)?; + } + } + }; + } + if let Http3State::Closed(..) = client.state() { return Ok(client.state()); } let mut exiting = !handler.handle(client)?; - loop { + 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { if let Err(e) = emit_datagram(socket, dgram) { eprintln!("UDP write error: {}", e); client.close(Instant::now(), 0, e.to_string()); exiting = true; - break; + break 'write; } } - Output::Callback(duration) => { - socket.set_read_timeout(Some(duration)).unwrap(); - break; + Output::Callback(new_timeout) => { + timeout = Some(new_timeout); + break 'write; } Output::None => { // Not strictly necessary, since we're about to exit - socket.set_read_timeout(None).unwrap(); exiting = true; - break; + break 'write; } } } @@ -428,26 +465,6 @@ fn process_loop( if exiting { return Ok(client.state()); } - - match socket.recv_from(&mut buf[..]) { - Err(ref err) - if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => {} - Err(err) => { - eprintln!("UDP error: {}", err); - exit(1) - } - Ok((sz, remote)) => { - if sz == buf.len() { - eprintln!("Received more than {} bytes", buf.len()); - continue; - } - if sz > 0 { - let d = Datagram::new(remote, *local_addr, &buf[..sz]); - client.process_input(d, Instant::now()); - handler.maybe_key_update(client)?; - } - } - }; } } @@ -802,6 +819,7 @@ fn handle_test( testcase: &String, args: &mut Args, socket: &UdpSocket, + poll: &Poll, local_addr: SocketAddr, remote_addr: SocketAddr, hostname: &str, @@ -823,7 +841,7 @@ fn handle_test( args, }; let mut h = Handler::new(url_handler, key_update, args.output_read_data); - process_loop(&local_addr, socket, &mut client, &mut h)?; + process_loop(&local_addr, socket, poll, &mut client, &mut h)?; } _ => { eprintln!("Unsupported test case: {}", testcase); @@ -877,9 +895,11 @@ fn create_http3_client( Ok(client) } +#[allow(clippy::too_many_arguments)] fn client( args: &mut Args, socket: &UdpSocket, + poll: &Poll, local_addr: SocketAddr, remote_addr: SocketAddr, hostname: &str, @@ -892,6 +912,7 @@ fn client( &testcase, args, socket, + poll, local_addr, remote_addr, hostname, @@ -912,7 +933,7 @@ fn client( }; let mut h = Handler::new(url_handler, key_update, args.output_read_data); - process_loop(&local_addr, socket, &mut client, &mut h)?; + process_loop(&local_addr, socket, poll, &mut client, &mut h)?; let token = if args.resume { // If we haven't received an event, take a token if there is one. @@ -1026,7 +1047,7 @@ fn main() -> Res<()> { SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), }; - let socket = match UdpSocket::bind(local_addr) { + let socket = match UdpSocket::bind(&local_addr) { Err(e) => { eprintln!("Unable to bind UDP socket: {}", e); exit(1) @@ -1034,6 +1055,14 @@ fn main() -> Res<()> { Ok(s) => s, }; + let poll = Poll::new()?; + poll.register( + &socket, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + )?; + let real_local = socket.local_addr().unwrap(); println!( "{} Client connecting: {:?} -> {:?}", @@ -1071,6 +1100,7 @@ fn main() -> Res<()> { old::old_client( &args, &socket, + &poll, real_local, remote_addr, &hostname, @@ -1081,6 +1111,7 @@ fn main() -> Res<()> { client( &mut args, &socket, + &poll, real_local, remote_addr, &hostname, @@ -1100,17 +1131,17 @@ mod old { collections::{HashMap, VecDeque}, fs::File, io::{ErrorKind, Write}, - net::{SocketAddr, UdpSocket}, + net::SocketAddr, path::PathBuf, process::exit, rc::Rc, - time::Instant, + time::{Duration, Instant}, }; use url::Url; use super::{qlog_new, KeyUpdateState, Res}; - + use mio::{Events, Poll}; use neqo_common::{event::Provider, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ @@ -1304,37 +1335,73 @@ mod old { fn process_loop_old( local_addr: &SocketAddr, - socket: &UdpSocket, + socket: &mio::net::UdpSocket, + poll: &Poll, client: &mut Connection, handler: &mut HandlerOld, ) -> Res { let buf = &mut [0u8; 2048]; + let mut events = Events::with_capacity(1024); + let mut timeout: Option = None; loop { + poll.poll( + &mut events, + if timeout.is_some() { + timeout + } else { + Some(Duration::from_millis(0)) + }, + )?; + + 'read: loop { + match socket.recv_from(&mut buf[..]) { + Err(ref err) => { + if err.kind() == ErrorKind::WouldBlock + || err.kind() == ErrorKind::Interrupted + { + break 'read; + } + eprintln!("UDP error: {}", err); + exit(1) + } + Ok((sz, remote)) => { + if sz == buf.len() { + eprintln!("Received more than {} bytes", buf.len()); + break 'read; + } + if sz > 0 { + let d = Datagram::new(remote, *local_addr, &buf[..sz]); + client.process_input(d, Instant::now()); + handler.maybe_key_update(client)?; + } + } + }; + } + if let State::Closed(..) = client.state() { return Ok(client.state().clone()); } let mut exiting = !handler.handle(client)?; - loop { + 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { if let Err(e) = emit_datagram(socket, dgram) { eprintln!("UDP write error: {}", e); client.close(Instant::now(), 0, e.to_string()); exiting = true; - break; + break 'write; } } - Output::Callback(duration) => { - socket.set_read_timeout(Some(duration)).unwrap(); - break; + Output::Callback(new_timeout) => { + timeout = Some(new_timeout); + break 'write; } Output::None => { // Not strictly necessary, since we're about to exit - socket.set_read_timeout(None).unwrap(); exiting = true; - break; + break 'write; } } } @@ -1342,32 +1409,14 @@ mod old { if exiting { return Ok(client.state().clone()); } - - match socket.recv_from(&mut buf[..]) { - Err(err) => { - if err.kind() != ErrorKind::WouldBlock && err.kind() != ErrorKind::Interrupted { - eprintln!("UDP error: {}", err); - exit(1); - } - } - Ok((sz, addr)) => { - if sz == buf.len() { - eprintln!("Received more than {} bytes", buf.len()); - continue; - } - if sz > 0 { - let d = Datagram::new(addr, *local_addr, &buf[..sz]); - client.process_input(d, Instant::now()); - handler.maybe_key_update(client)?; - } - } - } } } + #[allow(clippy::too_many_arguments)] pub fn old_client( args: &Args, - socket: &UdpSocket, + socket: &mio::net::UdpSocket, + poll: &Poll, local_addr: SocketAddr, remote_addr: SocketAddr, origin: &str, @@ -1410,7 +1459,7 @@ mod old { key_update, }; - process_loop_old(&local_addr, socket, &mut client, &mut h)?; + process_loop_old(&local_addr, socket, poll, &mut client, &mut h)?; let token = if args.resume { // If we haven't received an event, take a token if there is one. From 14ab5df3a52e956f2248d366c4226880dd7dda28 Mon Sep 17 00:00:00 2001 From: Kershaw Chang Date: Fri, 5 Jan 2024 12:15:49 +0800 Subject: [PATCH 2/2] Enhance socket read to handle multiple packets --- neqo-client/src/main.rs | 45 ++++++++++++++-------------- neqo-http3/src/connection_client.rs | 9 ++++++ neqo-transport/src/connection/mod.rs | 13 ++++++++ 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index e7af2a3779..6bcbfae9be 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -403,22 +403,21 @@ fn process_loop( loop { poll.poll( &mut events, - if timeout.is_some() { - timeout - } else { - Some(Duration::from_millis(0)) - }, + timeout.or_else(|| Some(Duration::from_millis(0))), )?; + let mut datagrams: Vec = Vec::new(); 'read: loop { match socket.recv_from(&mut buf[..]) { + Err(ref err) + if err.kind() == ErrorKind::WouldBlock + || err.kind() == ErrorKind::Interrupted => + { + break 'read + } Err(ref err) => { - if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted { - break 'read; - } - eprintln!("UDP error: {}", err); - exit(1) + exit(1); } Ok((sz, remote)) => { if sz == buf.len() { @@ -427,12 +426,15 @@ fn process_loop( } if sz > 0 { let d = Datagram::new(remote, *local_addr, &buf[..sz]); - client.process_input(d, Instant::now()); - handler.maybe_key_update(client)?; + datagrams.push(d); } } }; } + if !datagrams.is_empty() { + client.process_multiple_input(datagrams, Instant::now()); + handler.maybe_key_update(client)?; + } if let Http3State::Closed(..) = client.state() { return Ok(client.state()); @@ -1346,23 +1348,20 @@ mod old { loop { poll.poll( &mut events, - if timeout.is_some() { - timeout - } else { - Some(Duration::from_millis(0)) - }, + timeout.or_else(|| Some(Duration::from_millis(0))), )?; 'read: loop { match socket.recv_from(&mut buf[..]) { - Err(ref err) => { + Err(ref err) if err.kind() == ErrorKind::WouldBlock - || err.kind() == ErrorKind::Interrupted - { - break 'read; - } + || err.kind() == ErrorKind::Interrupted => + { + break 'read + } + Err(ref err) => { eprintln!("UDP error: {}", err); - exit(1) + exit(1); } Ok((sz, remote)) => { if sz == buf.len() { diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index 51cd8e2935..3cb6d94c8a 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -828,6 +828,15 @@ impl Http3Client { self.process_http3(now); } + pub fn process_multiple_input(&mut self, dgrams: Vec, now: Instant) { + qtrace!([self], "Process multiple datagrams, len={}", dgrams.len()); + if dgrams.is_empty() { + return; + } + self.conn.process_multiple_input(dgrams, now); + self.process_http3(now); + } + /// This should not be used because it gives access to functionalities that may disrupt the /// proper functioning of the HTTP/3 session. /// Only used by `neqo-interop`. diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index abb7e590ad..c074956253 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -921,6 +921,19 @@ impl Connection { self.streams.cleanup_closed_streams(); } + /// Process new input datagrams on the connection. + pub fn process_multiple_input(&mut self, dgrams: Vec, now: Instant) { + if dgrams.is_empty() { + return; + } + + for d in dgrams { + self.input(d, now, now); + } + self.process_saved(now); + self.streams.cleanup_closed_streams(); + } + /// Get the time that we next need to be called back, relative to `now`. fn next_delay(&mut self, now: Instant, paced: bool) -> Duration { qtrace!([self], "Get callback delay {:?}", now);