diff --git a/neqo-client/Cargo.toml b/neqo-client/Cargo.toml index ddf33cca0..4ca69647e 100644 --- a/neqo-client/Cargo.toml +++ b/neqo-client/Cargo.toml @@ -9,7 +9,8 @@ rust-version = "1.70.0" license = "MIT OR Apache-2.0" [dependencies] -mio = "~0.6.23" +futures = "0.3" +log = {version = "0.4.17", default-features = false} neqo-common = { path="./../neqo-common" } neqo-crypto = { path = "./../neqo-crypto" } neqo-http3 = { path = "./../neqo-http3" } @@ -17,6 +18,7 @@ neqo-qpack = { path = "./../neqo-qpack" } neqo-transport = { path = "./../neqo-transport" } qlog = "0.12.0" structopt = "0.3" +tokio = { version = "1", features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = "~2.5.0" [features] diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index 3db90aac1..d8444542e 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -13,9 +13,10 @@ use std::{ convert::TryFrom, fmt::{self, Display}, fs::{create_dir_all, File, OpenOptions}, - io::{self, ErrorKind, Write}, + io::{self, Write}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, path::PathBuf, + pin::Pin, process::exit, rc::Rc, str::FromStr, @@ -23,8 +24,13 @@ use std::{ }; use common::IpTos; -use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token}; -use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role}; +use futures::{ + future::{select, Either}, + FutureExt, TryFutureExt, +}; +use neqo_common::{ + self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role, +}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init, AuthenticationStatus, Cipher, ResumptionToken, @@ -39,6 +45,7 @@ use neqo_transport::{ }; use qlog::{events::EventImportance, streamer::QlogStreamer}; use structopt::StructOpt; +use tokio::{net::UdpSocket, time::Sleep}; use url::{Origin, Url}; #[derive(Debug)] @@ -343,10 +350,17 @@ impl QuicParameters { } } -fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> { - let sent = socket.send_to(&d[..], &d.destination())?; - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); +async fn emit_datagram(socket: &UdpSocket, out_dgram: Datagram) -> Result<(), io::Error> { + let sent = match socket.send_to(&out_dgram, &out_dgram.destination()).await { + Ok(res) => res, + Err(ref err) if err.kind() != io::ErrorKind::WouldBlock => { + eprintln!("UDP send error: {err:?}"); + 0 + } + Err(e) => return Err(e), + }; + if sent != out_dgram.len() { + eprintln!("Unable to send all {} bytes of datagram", out_dgram.len()); } Ok(()) } @@ -393,86 +407,59 @@ fn get_output_file( } } -fn process_loop( - local_addr: &SocketAddr, +enum Ready { + Socket, + Timeout, +} + +// Wait for the socket to be readable or the timeout to fire. +async fn ready( socket: &UdpSocket, - poll: &Poll, - client: &mut Http3Client, - handler: &mut Handler, -) -> Res { + mut timeout: Option<&mut Pin>>, +) -> Result { + let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket); + let timeout_ready = timeout + .as_mut() + .map(Either::Left) + .unwrap_or(Either::Right(futures::future::pending())) + .map(|()| Ok(Ready::Timeout)); + select(socket_ready, timeout_ready).await.factor_first().0 +} + +fn read_dgram( + socket: &UdpSocket, + local_address: &SocketAddr, +) -> Result, io::Error> { let buf = &mut [0u8; 2048]; - let mut events = Events::with_capacity(1024); - let mut timeout = Duration::new(0, 0); - loop { - poll.poll(&mut events, Some(timeout))?; - - let mut datagrams: Vec = Vec::new(); - 'read: loop { - match socket.recv_from(&mut buf[..]) { - Err(ref err) - if err.kind() == ErrorKind::WouldBlock - || err.kind() == ErrorKind::Interrupted => - { - break 'read - } - Err(ref err) => { - eprintln!("UDP error: {err}"); - exit(1); - } - Ok((sz, remote)) => { - if sz == buf.len() { - eprintln!("Received more than {} bytes", buf.len()); - break 'read; - } - if sz > 0 { - let d = - Datagram::new(remote, *local_addr, IpTos::default(), None, &buf[..sz]); - datagrams.push(d); - } - } - }; + let (sz, remote_addr) = match socket.try_recv_from(&mut buf[..]) { + Err(ref err) + if err.kind() == io::ErrorKind::WouldBlock + || err.kind() == io::ErrorKind::Interrupted => + { + return Ok(None) } - if !datagrams.is_empty() { - client.process_multiple_input(&datagrams, Instant::now()); - handler.maybe_key_update(client)?; + Err(err) => { + eprintln!("UDP recv error: {err:?}"); + return Err(err); } + Ok(res) => res, + }; - if let Http3State::Closed(..) = client.state() { - return Ok(client.state()); - } - - let mut exiting = !handler.handle(client)?; - - 'write: loop { - match client.process_output(Instant::now()) { - Output::Datagram(dgram) => { - if let Err(err) = emit_datagram(socket, dgram) { - if err.kind() == ErrorKind::WouldBlock - || err.kind() == ErrorKind::Interrupted - { - break 'write; - } - eprintln!("UDP write error: {err}"); - client.close(Instant::now(), 0, err.to_string()); - exiting = true; - break 'write; - } - } - Output::Callback(new_timeout) => { - timeout = new_timeout; - break 'write; - } - Output::None => { - // Not strictly necessary, since we're about to exit - exiting = true; - break 'write; - } - } - } + if sz == buf.len() { + eprintln!("Might have received more than {} bytes", buf.len()); + } - if exiting { - return Ok(client.state()); - } + if sz == 0 { + eprintln!("zero length datagram received?"); + Ok(None) + } else { + Ok(Some(Datagram::new( + remote_addr, + *local_address, + IpTos::default(), + None, + &buf[..sz], + ))) } } @@ -821,39 +808,122 @@ fn to_headers(values: &[impl AsRef]) -> Vec
{ .collect() } -#[allow(clippy::too_many_arguments)] -fn handle_test( - testcase: &String, - args: &mut Args, - socket: &UdpSocket, - poll: &Poll, +struct ClientRunner<'a> { local_addr: SocketAddr, - remote_addr: SocketAddr, - hostname: &str, - url_queue: VecDeque, - resumption_token: Option, -) -> Res> { - let key_update = KeyUpdateState(args.key_update); - if testcase.as_str() == "upload" { - let mut client = - create_http3_client(args, local_addr, remote_addr, hostname, resumption_token) - .expect("failed to create client"); - args.method = String::from("POST"); + socket: &'a UdpSocket, + client: Http3Client, + handler: Handler<'a>, + timeout: Option>>, + args: &'a Args, +} + +impl<'a> ClientRunner<'a> { + async fn new( + args: &'a mut Args, + socket: &'a UdpSocket, + local_addr: SocketAddr, + remote_addr: SocketAddr, + hostname: &str, + url_queue: VecDeque, + resumption_token: Option, + ) -> Res> { + if let Some(testcase) = &args.test { + if testcase.as_str() != "upload" { + eprintln!("Unsupported test case: {testcase}"); + exit(127) + } + } + + let client = create_http3_client(args, local_addr, remote_addr, hostname, resumption_token) + .expect("failed to create client"); + if args.test.is_some() { + args.method = String::from("POST"); + } + let key_update = KeyUpdateState(args.key_update); let url_handler = URLHandler { url_queue, stream_handlers: HashMap::new(), all_paths: Vec::new(), - handler_type: StreamHandlerType::Upload, + handler_type: if args.test.is_some() { + StreamHandlerType::Upload + } else { + StreamHandlerType::Download + }, args, }; - let mut h = Handler::new(url_handler, key_update, args.output_read_data); - process_loop(&local_addr, socket, poll, &mut client, &mut h)?; - } else { - eprintln!("Unsupported test case: {testcase}"); - exit(127) + let handler = Handler::new(url_handler, key_update, args.output_read_data); + + Ok(Self { + local_addr, + socket, + client, + handler, + timeout: None, + args, + }) + } + + async fn run(mut self) -> Res> { + loop { + if !self.handler.handle(&mut self.client)? { + break; + } + + self.process(None).await?; + + match ready(self.socket, self.timeout.as_mut()).await? { + Ready::Socket => loop { + let dgram = read_dgram(self.socket, &self.local_addr)?; + if dgram.is_none() { + break; + } + self.process(dgram.as_ref()).await?; + self.handler.maybe_key_update(&mut self.client)?; + }, + Ready::Timeout => { + self.timeout = None; + } + } + + if let Http3State::Closed(..) = self.client.state() { + break; + } + } + + let token = if self.args.test.is_none() && self.args.resume { + // If we haven't received an event, take a token if there is one. + // Lots of servers don't provide NEW_TOKEN, but a session ticket + // without NEW_TOKEN is better than nothing. + self.handler + .token + .take() + .or_else(|| self.client.take_resumption_token(Instant::now())) + } else { + None + }; + Ok(token) } - Ok(None) + async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + loop { + match self.client.process(dgram.take(), Instant::now()) { + Output::Datagram(dgram) => { + emit_datagram(self.socket, dgram).await?; + } + Output::Callback(new_timeout) => { + qinfo!("Setting timeout of {:?}", new_timeout); + self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + break; + } + Output::None => { + qdebug!("Output::None"); + break; + } + } + } + + Ok(()) + } } fn create_http3_client( @@ -899,58 +969,6 @@ fn create_http3_client( Ok(client) } -#[allow(clippy::too_many_arguments)] -fn client( - args: &mut Args, - socket: &UdpSocket, - poll: &Poll, - local_addr: SocketAddr, - remote_addr: SocketAddr, - hostname: &str, - url_queue: VecDeque, - resumption_token: Option, -) -> Res> { - let testcase = args.test.clone(); - if let Some(testcase) = testcase { - return handle_test( - &testcase, - args, - socket, - poll, - local_addr, - remote_addr, - hostname, - url_queue, - resumption_token, - ); - } - - let mut client = create_http3_client(args, local_addr, remote_addr, hostname, resumption_token) - .expect("failed to create client"); - let key_update = KeyUpdateState(args.key_update); - let url_handler = URLHandler { - url_queue, - stream_handlers: HashMap::new(), - all_paths: Vec::new(), - handler_type: StreamHandlerType::Download, - args, - }; - let mut h = Handler::new(url_handler, key_update, args.output_read_data); - - process_loop(&local_addr, socket, poll, &mut client, &mut h)?; - - let token = if args.resume { - // If we haven't received an event, take a token if there is one. - // Lots of servers don't provide NEW_TOKEN, but a session ticket - // without NEW_TOKEN is better than nothing. - h.token - .or_else(|| client.take_resumption_token(Instant::now())) - } else { - None - }; - Ok(token) -} - fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { if let Some(qlog_dir) = &args.qlog_dir { let mut qlog_path = qlog_dir.to_path_buf(); @@ -980,7 +998,8 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { } } -fn main() -> Res<()> { +#[tokio::main] +async fn main() -> Res<()> { init(); let mut args = Args::from_args(); @@ -1059,21 +1078,15 @@ fn main() -> Res<()> { SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), }; - let socket = match UdpSocket::bind(&local_addr) { + let socket = match std::net::UdpSocket::bind(local_addr) { Err(e) => { eprintln!("Unable to bind UDP socket: {e}"); exit(1) } Ok(s) => s, }; - - let poll = Poll::new()?; - poll.register( - &socket, - Token(0), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )?; + socket.set_nonblocking(true)?; + let socket = UdpSocket::from_std(socket)?; let real_local = socket.local_addr().unwrap(); println!( @@ -1096,27 +1109,31 @@ fn main() -> Res<()> { first = false; token = if args.use_old_http { - old::old_client( + old::ClientRunner::new( &args, &socket, - &poll, real_local, remote_addr, &hostname, to_request, token, - )? + ) + .await? + .run() + .await? } else { - client( + ClientRunner::new( &mut args, &socket, - &poll, real_local, remote_addr, &hostname, to_request, token, - )? + ) + .await? + .run() + .await? }; } } @@ -1129,24 +1146,25 @@ mod old { cell::RefCell, collections::{HashMap, VecDeque}, fs::File, - io::{ErrorKind, Write}, + io::{self, Write}, net::SocketAddr, path::PathBuf, - process::exit, + pin::Pin, rc::Rc, - time::{Duration, Instant}, + time::Instant, }; - use mio::{Events, Poll}; - use neqo_common::{event::Provider, Datagram, IpTos}; + use neqo_common::{event::Provider, qdebug, qinfo, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, StreamType, }; + use tokio::{net::UdpSocket, time::Sleep}; use url::Url; - use super::{emit_datagram, get_output_file, qlog_new, Args, KeyUpdateState, Res}; + use super::{get_output_file, qlog_new, read_dgram, ready, Args, KeyUpdateState, Ready, Res}; + use crate::emit_datagram; struct HandlerOld<'b> { streams: HashMap>, @@ -1330,143 +1348,132 @@ mod old { } } - fn process_loop_old( - local_addr: &SocketAddr, - socket: &mio::net::UdpSocket, - poll: &Poll, - client: &mut Connection, - handler: &mut HandlerOld, - ) -> Res { - let buf = &mut [0u8; 2048]; - let mut events = Events::with_capacity(1024); - let mut timeout = Duration::new(0, 0); - loop { - poll.poll(&mut events, Some(timeout))?; - - 'read: loop { - match socket.recv_from(&mut buf[..]) { - Err(ref err) - if err.kind() == ErrorKind::WouldBlock - || err.kind() == ErrorKind::Interrupted => - { - break 'read - } - Err(ref err) => { - eprintln!("UDP error: {err}"); - exit(1); - } - Ok((sz, remote)) => { - if sz == buf.len() { - eprintln!("Received more than {} bytes", buf.len()); - break 'read; - } - if sz > 0 { - let d = Datagram::new( - remote, - *local_addr, - IpTos::default(), - None, - &buf[..sz], - ); - client.process_input(&d, Instant::now()); - handler.maybe_key_update(client)?; + pub struct ClientRunner<'a> { + local_addr: SocketAddr, + socket: &'a UdpSocket, + client: Connection, + handler: HandlerOld<'a>, + timeout: Option>>, + args: &'a Args, + } + + impl<'a> ClientRunner<'a> { + pub async fn new( + args: &'a Args, + socket: &'a UdpSocket, + local_addr: SocketAddr, + remote_addr: SocketAddr, + origin: &str, + url_queue: VecDeque, + token: Option, + ) -> Res> { + let alpn = match args.alpn.as_str() { + "hq-29" | "hq-30" | "hq-31" | "hq-32" => args.alpn.as_str(), + _ => "hq-interop", + }; + + let mut client = Connection::new_client( + origin, + &[alpn], + Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), + local_addr, + remote_addr, + args.quic_parameters.get(alpn), + Instant::now(), + )?; + + if let Some(tok) = token { + client.enable_resumption(Instant::now(), tok)?; + } + + let ciphers = args.get_ciphers(); + if !ciphers.is_empty() { + client.set_ciphers(&ciphers)?; + } + + client.set_qlog(qlog_new(args, origin, client.odcid().unwrap())?); + + let key_update = KeyUpdateState(args.key_update); + let handler = HandlerOld { + streams: HashMap::new(), + url_queue, + all_paths: Vec::new(), + args, + token: None, + key_update, + }; + + Ok(Self { + local_addr, + socket, + client, + handler, + timeout: None, + args, + }) + } + + pub async fn run(mut self) -> Res> { + loop { + if !self.handler.handle(&mut self.client)? { + break; + } + + self.process(None).await?; + + match ready(self.socket, self.timeout.as_mut()).await? { + Ready::Socket => loop { + let dgram = read_dgram(self.socket, &self.local_addr)?; + if dgram.is_none() { + break; } + self.process(dgram.as_ref()).await?; + self.handler.maybe_key_update(&mut self.client)?; + }, + Ready::Timeout => { + self.timeout = None; } - }; - } + } - if let State::Closed(..) = client.state() { - return Ok(client.state().clone()); + if let State::Closed(..) = self.client.state() { + break; + } } - let mut exiting = !handler.handle(client)?; + let token = if self.args.resume { + // If we haven't received an event, take a token if there is one. + // Lots of servers don't provide NEW_TOKEN, but a session ticket + // without NEW_TOKEN is better than nothing. + self.handler + .token + .take() + .or_else(|| self.client.take_resumption_token(Instant::now())) + } else { + None + }; + + Ok(token) + } - 'write: loop { - match client.process_output(Instant::now()) { + async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + loop { + match self.client.process(dgram.take(), Instant::now()) { Output::Datagram(dgram) => { - if let Err(e) = emit_datagram(socket, dgram) { - eprintln!("UDP write error: {e}"); - client.close(Instant::now(), 0, e.to_string()); - exiting = true; - break 'write; - } + emit_datagram(self.socket, dgram).await?; } Output::Callback(new_timeout) => { - timeout = new_timeout; - break 'write; + qinfo!("Setting timeout of {:?}", new_timeout); + self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + break; } Output::None => { - // Not strictly necessary, since we're about to exit - exiting = true; - break 'write; + qdebug!("Output::None"); + break; } } } - if exiting { - return Ok(client.state().clone()); - } - } - } - - #[allow(clippy::too_many_arguments)] - pub fn old_client( - args: &Args, - socket: &mio::net::UdpSocket, - poll: &Poll, - local_addr: SocketAddr, - remote_addr: SocketAddr, - origin: &str, - url_queue: VecDeque, - token: Option, - ) -> Res> { - let alpn = match args.alpn.as_str() { - "hq-29" | "hq-30" | "hq-31" | "hq-32" => args.alpn.as_str(), - _ => "hq-interop", - }; - - let mut client = Connection::new_client( - origin, - &[alpn], - Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), - local_addr, - remote_addr, - args.quic_parameters.get(alpn), - Instant::now(), - )?; - - if let Some(tok) = token { - client.enable_resumption(Instant::now(), tok)?; - } - - let ciphers = args.get_ciphers(); - if !ciphers.is_empty() { - client.set_ciphers(&ciphers)?; + Ok(()) } - - client.set_qlog(qlog_new(args, origin, client.odcid().unwrap())?); - - let key_update = KeyUpdateState(args.key_update); - let mut h = HandlerOld { - streams: HashMap::new(), - url_queue, - all_paths: Vec::new(), - args, - token: None, - key_update, - }; - - process_loop_old(&local_addr, socket, poll, &mut client, &mut h)?; - - let token = if args.resume { - // If we haven't received an event, take a token if there is one. - // Lots of servers don't provide NEW_TOKEN, but a session ticket - // without NEW_TOKEN is better than nothing. - h.token - .or_else(|| client.take_resumption_token(Instant::now())) - } else { - None - }; - Ok(token) } } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 2de388418..1678e0b8b 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -2585,10 +2585,16 @@ impl Connection { ) -> Res<()> { qtrace!([self], "Handshake space={} data={:0x?}", space, data); + let was_authentication_pending = + *self.crypto.tls.state() == HandshakeState::AuthenticationPending; let try_update = data.is_some(); match self.crypto.handshake(now, space, data)? { HandshakeState::Authenticated(_) | HandshakeState::InProgress => (), - HandshakeState::AuthenticationPending => self.events.authentication_needed(), + HandshakeState::AuthenticationPending => { + if !was_authentication_pending { + self.events.authentication_needed() + } + } HandshakeState::EchFallbackAuthenticationPending(public_name) => self .events .ech_fallback_authentication_needed(public_name.clone()), diff --git a/neqo-transport/src/connection/tests/handshake.rs b/neqo-transport/src/connection/tests/handshake.rs index 93385ac1b..a91ecf1b4 100644 --- a/neqo-transport/src/connection/tests/handshake.rs +++ b/neqo-transport/src/connection/tests/handshake.rs @@ -1135,3 +1135,54 @@ fn implicit_rtt_server() { // an RTT estimate from having discarded the Initial packet number space. assert_eq!(server.stats().rtt, RTT); } + +#[test] +fn emit_authentication_needed_once() { + let mut client = default_client(); + + let mut server = Connection::new_server( + test_fixture::LONG_CERT_KEYS, + test_fixture::DEFAULT_ALPN, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + ConnectionParameters::default(), + ) + .expect("create a server"); + + let client1 = client.process(None, now()); + assert!(client1.as_dgram_ref().is_some()); + + // The entire server flight doesn't fit in a single packet because the + // certificate is large, therefore the server will produce 2 packets. + let server1 = server.process(client1.as_dgram_ref(), now()); + assert!(server1.as_dgram_ref().is_some()); + let server2 = server.process(None, now()); + assert!(server2.as_dgram_ref().is_some()); + + let authentication_needed_count = |client: &mut Connection| { + client + .events() + .filter(|e| matches!(e, ConnectionEvent::AuthenticationNeeded)) + .count() + }; + + // Upon receiving the first packet, the client has the server certificate, + // but not yet all required handshake data. It moves to + // `HandshakeState::AuthenticationPending` and emits a + // `ConnectionEvent::AuthenticationNeeded` event. + // + // Note that this is a tiny bit fragile in that it depends on having a certificate + // that is within a fairly narrow range of sizes. It has to fit in a single + // packet, but be large enough that the CertificateVerify message does not + // also fit in the same packet. Our default test setup achieves this, but + // changes to the setup might invalidate this test. + let _ = client.process(server1.as_dgram_ref(), now()); + assert_eq!(1, authentication_needed_count(&mut client)); + assert!(client.peer_certificate().is_some()); + + // The `AuthenticationNeeded` event is still pending a call to + // `Connection::authenticated`. On receiving the second packet from the + // server, the client must not emit a another + // `ConnectionEvent::AuthenticationNeeded`. + let _ = client.process(server2.as_dgram_ref(), now()); + assert_eq!(0, authentication_needed_count(&mut client)); +}