Skip to content

Commit

Permalink
Enhance socket read to handle multiple packets (#1530)
Browse files Browse the repository at this point in the history
* Enhance socket read to handle multiple packets

* Enhance socket read to handle multiple packets
  • Loading branch information
KershawChang authored Jan 5, 2024
1 parent 096c46b commit 0c421d2
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 66 deletions.
1 change: 1 addition & 0 deletions neqo-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ neqo-qpack = { path = "./../neqo-qpack" }
structopt = "0.3.7"
url = "2.0"
qlog = "0.10.0"
mio = "0.6.17"

[features]
deny-warnings = []
180 changes: 114 additions & 66 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

use qlog::{events::EventImportance, streamer::QlogStreamer};

use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token};

use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
Expand All @@ -30,7 +32,7 @@ use std::{
fmt::{self, Display},
fs::{create_dir_all, File, OpenOptions},
io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
path::PathBuf,
process::exit,
rc::Rc,
Expand Down Expand Up @@ -338,8 +340,8 @@ impl QuicParameters {
}
}

fn emit_datagram(socket: &UdpSocket, d: Datagram) -> io::Result<()> {
let sent = socket.send_to(&d[..], d.destination())?;
fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> {
let sent = socket.send_to(&d[..], &d.destination())?;
if sent != d.len() {
eprintln!("Unable to send all {} bytes of datagram", d.len());
}
Expand Down Expand Up @@ -391,63 +393,80 @@ fn get_output_file(
fn process_loop(
local_addr: &SocketAddr,
socket: &UdpSocket,
poll: &Poll,
client: &mut Http3Client,
handler: &mut Handler,
) -> Res<neqo_http3::Http3State> {
let buf = &mut [0u8; 2048];
let mut events = Events::with_capacity(1024);
let mut timeout: Option<Duration> = None;
loop {
poll.poll(
&mut events,
timeout.or_else(|| Some(Duration::from_millis(0))),
)?;

let mut datagrams: Vec<Datagram> = Vec::new();
'read: loop {
match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock
|| err.kind() == ErrorKind::Interrupted =>
{
break 'read
}
Err(ref err) => {
eprintln!("UDP error: {}", err);
exit(1);
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
break 'read;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
datagrams.push(d);
}
}
};
}
if !datagrams.is_empty() {
client.process_multiple_input(datagrams, Instant::now());
handler.maybe_key_update(client)?;
}

if let Http3State::Closed(..) = client.state() {
return Ok(client.state());
}

let mut exiting = !handler.handle(client)?;

loop {
'write: loop {
match client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
if let Err(e) = emit_datagram(socket, dgram) {
eprintln!("UDP write error: {}", e);
client.close(Instant::now(), 0, e.to_string());
exiting = true;
break;
break 'write;
}
}
Output::Callback(duration) => {
socket.set_read_timeout(Some(duration)).unwrap();
break;
Output::Callback(new_timeout) => {
timeout = Some(new_timeout);
break 'write;
}
Output::None => {
// Not strictly necessary, since we're about to exit
socket.set_read_timeout(None).unwrap();
exiting = true;
break;
break 'write;
}
}
}

if exiting {
return Ok(client.state());
}

match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => {}
Err(err) => {
eprintln!("UDP error: {}", err);
exit(1)
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
continue;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
};
}
}

Expand Down Expand Up @@ -802,6 +821,7 @@ fn handle_test(
testcase: &String,
args: &mut Args,
socket: &UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand All @@ -823,7 +843,7 @@ fn handle_test(
args,
};
let mut h = Handler::new(url_handler, key_update, args.output_read_data);
process_loop(&local_addr, socket, &mut client, &mut h)?;
process_loop(&local_addr, socket, poll, &mut client, &mut h)?;
}
_ => {
eprintln!("Unsupported test case: {}", testcase);
Expand Down Expand Up @@ -877,9 +897,11 @@ fn create_http3_client(
Ok(client)
}

#[allow(clippy::too_many_arguments)]
fn client(
args: &mut Args,
socket: &UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand All @@ -892,6 +914,7 @@ fn client(
&testcase,
args,
socket,
poll,
local_addr,
remote_addr,
hostname,
Expand All @@ -912,7 +935,7 @@ fn client(
};
let mut h = Handler::new(url_handler, key_update, args.output_read_data);

process_loop(&local_addr, socket, &mut client, &mut h)?;
process_loop(&local_addr, socket, poll, &mut client, &mut h)?;

let token = if args.resume {
// If we haven't received an event, take a token if there is one.
Expand Down Expand Up @@ -1026,14 +1049,22 @@ fn main() -> Res<()> {
SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0),
};

let socket = match UdpSocket::bind(local_addr) {
let socket = match UdpSocket::bind(&local_addr) {
Err(e) => {
eprintln!("Unable to bind UDP socket: {}", e);
exit(1)
}
Ok(s) => s,
};

let poll = Poll::new()?;
poll.register(
&socket,
Token(0),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
)?;

let real_local = socket.local_addr().unwrap();
println!(
"{} Client connecting: {:?} -> {:?}",
Expand Down Expand Up @@ -1071,6 +1102,7 @@ fn main() -> Res<()> {
old::old_client(
&args,
&socket,
&poll,
real_local,
remote_addr,
&hostname,
Expand All @@ -1081,6 +1113,7 @@ fn main() -> Res<()> {
client(
&mut args,
&socket,
&poll,
real_local,
remote_addr,
&hostname,
Expand All @@ -1100,17 +1133,17 @@ mod old {
collections::{HashMap, VecDeque},
fs::File,
io::{ErrorKind, Write},
net::{SocketAddr, UdpSocket},
net::SocketAddr,
path::PathBuf,
process::exit,
rc::Rc,
time::Instant,
time::{Duration, Instant},
};

use url::Url;

use super::{qlog_new, KeyUpdateState, Res};

use mio::{Events, Poll};
use neqo_common::{event::Provider, Datagram};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
Expand Down Expand Up @@ -1304,70 +1337,85 @@ mod old {

fn process_loop_old(
local_addr: &SocketAddr,
socket: &UdpSocket,
socket: &mio::net::UdpSocket,
poll: &Poll,
client: &mut Connection,
handler: &mut HandlerOld,
) -> Res<State> {
let buf = &mut [0u8; 2048];
let mut events = Events::with_capacity(1024);
let mut timeout: Option<Duration> = None;
loop {
poll.poll(
&mut events,
timeout.or_else(|| Some(Duration::from_millis(0))),
)?;

'read: loop {
match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock
|| err.kind() == ErrorKind::Interrupted =>
{
break 'read
}
Err(ref err) => {
eprintln!("UDP error: {}", err);
exit(1);
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
break 'read;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
};
}

if let State::Closed(..) = client.state() {
return Ok(client.state().clone());
}

let mut exiting = !handler.handle(client)?;

loop {
'write: loop {
match client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
if let Err(e) = emit_datagram(socket, dgram) {
eprintln!("UDP write error: {}", e);
client.close(Instant::now(), 0, e.to_string());
exiting = true;
break;
break 'write;
}
}
Output::Callback(duration) => {
socket.set_read_timeout(Some(duration)).unwrap();
break;
Output::Callback(new_timeout) => {
timeout = Some(new_timeout);
break 'write;
}
Output::None => {
// Not strictly necessary, since we're about to exit
socket.set_read_timeout(None).unwrap();
exiting = true;
break;
break 'write;
}
}
}

if exiting {
return Ok(client.state().clone());
}

match socket.recv_from(&mut buf[..]) {
Err(err) => {
if err.kind() != ErrorKind::WouldBlock && err.kind() != ErrorKind::Interrupted {
eprintln!("UDP error: {}", err);
exit(1);
}
}
Ok((sz, addr)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
continue;
}
if sz > 0 {
let d = Datagram::new(addr, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
}
}
}

#[allow(clippy::too_many_arguments)]
pub fn old_client(
args: &Args,
socket: &UdpSocket,
socket: &mio::net::UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
origin: &str,
Expand Down Expand Up @@ -1410,7 +1458,7 @@ mod old {
key_update,
};

process_loop_old(&local_addr, socket, &mut client, &mut h)?;
process_loop_old(&local_addr, socket, poll, &mut client, &mut h)?;

let token = if args.resume {
// If we haven't received an event, take a token if there is one.
Expand Down
9 changes: 9 additions & 0 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,15 @@ impl Http3Client {
self.process_http3(now);
}

pub fn process_multiple_input(&mut self, dgrams: Vec<Datagram>, now: Instant) {
qtrace!([self], "Process multiple datagrams, len={}", dgrams.len());
if dgrams.is_empty() {
return;
}
self.conn.process_multiple_input(dgrams, now);
self.process_http3(now);
}

/// This should not be used because it gives access to functionalities that may disrupt the
/// proper functioning of the HTTP/3 session.
/// Only used by `neqo-interop`.
Expand Down
Loading

0 comments on commit 0c421d2

Please sign in to comment.