Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance socket read to handle multiple packets #1530

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions neqo-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ neqo-qpack = { path = "./../neqo-qpack" }
structopt = "0.3.7"
url = "2.0"
qlog = "0.10.0"
mio = "0.6.17"

[features]
deny-warnings = []
180 changes: 114 additions & 66 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

use qlog::{events::EventImportance, streamer::QlogStreamer};

use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token};

use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
Expand All @@ -30,7 +32,7 @@ use std::{
fmt::{self, Display},
fs::{create_dir_all, File, OpenOptions},
io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
path::PathBuf,
process::exit,
rc::Rc,
Expand Down Expand Up @@ -338,8 +340,8 @@ impl QuicParameters {
}
}

fn emit_datagram(socket: &UdpSocket, d: Datagram) -> io::Result<()> {
let sent = socket.send_to(&d[..], d.destination())?;
fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> {
let sent = socket.send_to(&d[..], &d.destination())?;
if sent != d.len() {
eprintln!("Unable to send all {} bytes of datagram", d.len());
}
Expand Down Expand Up @@ -391,63 +393,80 @@ fn get_output_file(
fn process_loop(
local_addr: &SocketAddr,
socket: &UdpSocket,
poll: &Poll,
client: &mut Http3Client,
handler: &mut Handler,
) -> Res<neqo_http3::Http3State> {
let buf = &mut [0u8; 2048];
let mut events = Events::with_capacity(1024);
let mut timeout: Option<Duration> = None;
loop {
poll.poll(
&mut events,
timeout.or_else(|| Some(Duration::from_millis(0))),
)?;

let mut datagrams: Vec<Datagram> = Vec::new();
'read: loop {
match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock
|| err.kind() == ErrorKind::Interrupted =>
{
break 'read
}
Err(ref err) => {
eprintln!("UDP error: {}", err);
exit(1);
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
break 'read;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
datagrams.push(d);
}
}
};
}
if !datagrams.is_empty() {
client.process_multiple_input(datagrams, Instant::now());
handler.maybe_key_update(client)?;
}

if let Http3State::Closed(..) = client.state() {
return Ok(client.state());
}

let mut exiting = !handler.handle(client)?;

loop {
'write: loop {
match client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
if let Err(e) = emit_datagram(socket, dgram) {
eprintln!("UDP write error: {}", e);
client.close(Instant::now(), 0, e.to_string());
exiting = true;
break;
break 'write;
}
}
Output::Callback(duration) => {
socket.set_read_timeout(Some(duration)).unwrap();
break;
Output::Callback(new_timeout) => {
timeout = Some(new_timeout);
break 'write;
}
Output::None => {
// Not strictly necessary, since we're about to exit
socket.set_read_timeout(None).unwrap();
exiting = true;
break;
break 'write;
}
}
}

if exiting {
return Ok(client.state());
}

match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => {}
Err(err) => {
eprintln!("UDP error: {}", err);
exit(1)
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
continue;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
};
}
}

Expand Down Expand Up @@ -802,6 +821,7 @@ fn handle_test(
testcase: &String,
args: &mut Args,
socket: &UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand All @@ -823,7 +843,7 @@ fn handle_test(
args,
};
let mut h = Handler::new(url_handler, key_update, args.output_read_data);
process_loop(&local_addr, socket, &mut client, &mut h)?;
process_loop(&local_addr, socket, poll, &mut client, &mut h)?;
}
_ => {
eprintln!("Unsupported test case: {}", testcase);
Expand Down Expand Up @@ -877,9 +897,11 @@ fn create_http3_client(
Ok(client)
}

#[allow(clippy::too_many_arguments)]
fn client(
args: &mut Args,
socket: &UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand All @@ -892,6 +914,7 @@ fn client(
&testcase,
args,
socket,
poll,
local_addr,
remote_addr,
hostname,
Expand All @@ -912,7 +935,7 @@ fn client(
};
let mut h = Handler::new(url_handler, key_update, args.output_read_data);

process_loop(&local_addr, socket, &mut client, &mut h)?;
process_loop(&local_addr, socket, poll, &mut client, &mut h)?;

let token = if args.resume {
// If we haven't received an event, take a token if there is one.
Expand Down Expand Up @@ -1026,14 +1049,22 @@ fn main() -> Res<()> {
SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0),
};

let socket = match UdpSocket::bind(local_addr) {
let socket = match UdpSocket::bind(&local_addr) {
Err(e) => {
eprintln!("Unable to bind UDP socket: {}", e);
exit(1)
}
Ok(s) => s,
};

let poll = Poll::new()?;
poll.register(
&socket,
Token(0),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
)?;

let real_local = socket.local_addr().unwrap();
println!(
"{} Client connecting: {:?} -> {:?}",
Expand Down Expand Up @@ -1071,6 +1102,7 @@ fn main() -> Res<()> {
old::old_client(
&args,
&socket,
&poll,
real_local,
remote_addr,
&hostname,
Expand All @@ -1081,6 +1113,7 @@ fn main() -> Res<()> {
client(
&mut args,
&socket,
&poll,
real_local,
remote_addr,
&hostname,
Expand All @@ -1100,17 +1133,17 @@ mod old {
collections::{HashMap, VecDeque},
fs::File,
io::{ErrorKind, Write},
net::{SocketAddr, UdpSocket},
net::SocketAddr,
path::PathBuf,
process::exit,
rc::Rc,
time::Instant,
time::{Duration, Instant},
};

use url::Url;

use super::{qlog_new, KeyUpdateState, Res};

use mio::{Events, Poll};
use neqo_common::{event::Provider, Datagram};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
Expand Down Expand Up @@ -1304,70 +1337,85 @@ mod old {

fn process_loop_old(
local_addr: &SocketAddr,
socket: &UdpSocket,
socket: &mio::net::UdpSocket,
poll: &Poll,
client: &mut Connection,
handler: &mut HandlerOld,
) -> Res<State> {
let buf = &mut [0u8; 2048];
let mut events = Events::with_capacity(1024);
let mut timeout: Option<Duration> = None;
loop {
poll.poll(
&mut events,
timeout.or_else(|| Some(Duration::from_millis(0))),
)?;

'read: loop {
match socket.recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == ErrorKind::WouldBlock
|| err.kind() == ErrorKind::Interrupted =>
{
break 'read
}
Err(ref err) => {
eprintln!("UDP error: {}", err);
exit(1);
}
Ok((sz, remote)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
break 'read;
}
if sz > 0 {
let d = Datagram::new(remote, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
};
}

if let State::Closed(..) = client.state() {
return Ok(client.state().clone());
}

let mut exiting = !handler.handle(client)?;

loop {
'write: loop {
match client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
if let Err(e) = emit_datagram(socket, dgram) {
eprintln!("UDP write error: {}", e);
client.close(Instant::now(), 0, e.to_string());
exiting = true;
break;
break 'write;
}
}
Output::Callback(duration) => {
socket.set_read_timeout(Some(duration)).unwrap();
break;
Output::Callback(new_timeout) => {
timeout = Some(new_timeout);
break 'write;
}
Output::None => {
// Not strictly necessary, since we're about to exit
socket.set_read_timeout(None).unwrap();
exiting = true;
break;
break 'write;
}
}
}

if exiting {
return Ok(client.state().clone());
}

match socket.recv_from(&mut buf[..]) {
Err(err) => {
if err.kind() != ErrorKind::WouldBlock && err.kind() != ErrorKind::Interrupted {
eprintln!("UDP error: {}", err);
exit(1);
}
}
Ok((sz, addr)) => {
if sz == buf.len() {
eprintln!("Received more than {} bytes", buf.len());
continue;
}
if sz > 0 {
let d = Datagram::new(addr, *local_addr, &buf[..sz]);
client.process_input(d, Instant::now());
handler.maybe_key_update(client)?;
}
}
}
}
}

#[allow(clippy::too_many_arguments)]
pub fn old_client(
args: &Args,
socket: &UdpSocket,
socket: &mio::net::UdpSocket,
poll: &Poll,
local_addr: SocketAddr,
remote_addr: SocketAddr,
origin: &str,
Expand Down Expand Up @@ -1410,7 +1458,7 @@ mod old {
key_update,
};

process_loop_old(&local_addr, socket, &mut client, &mut h)?;
process_loop_old(&local_addr, socket, poll, &mut client, &mut h)?;

let token = if args.resume {
// If we haven't received an event, take a token if there is one.
Expand Down
9 changes: 9 additions & 0 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,15 @@ impl Http3Client {
self.process_http3(now);
}

pub fn process_multiple_input(&mut self, dgrams: Vec<Datagram>, now: Instant) {
qtrace!([self], "Process multiple datagrams, len={}", dgrams.len());
if dgrams.is_empty() {
return;
}
self.conn.process_multiple_input(dgrams, now);
self.process_http3(now);
}

/// This should not be used because it gives access to functionalities that may disrupt the
/// proper functioning of the HTTP/3 session.
/// Only used by `neqo-interop`.
Expand Down
Loading
Loading