From c2b0721f6de6a5ae4690b5c6372803b0a548f995 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 5 Apr 2024 13:31:31 +0200 Subject: [PATCH] Don't await input if there are events pending --- neqo-bin/src/client/http09.rs | 4 ++++ neqo-bin/src/client/http3.rs | 4 ++++ neqo-bin/src/client/mod.rs | 30 ++++++++++++++++++------------ neqo-bin/src/server/mod.rs | 16 +++++++++++++--- neqo-bin/src/server/old_https.rs | 4 ++++ neqo-transport/src/server.rs | 7 +++++++ 6 files changed, 50 insertions(+), 15 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 7d30017bb..d2d7a7be5 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -160,6 +160,10 @@ impl super::Client for Connection { fn stats(&self) -> neqo_transport::Stats { self.stats() } + + fn has_events(&self) -> bool { + neqo_common::event::Provider::has_events(self) + } } impl<'b> Handler<'b> { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index 486992c51..c3a45dd50 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -133,6 +133,10 @@ impl super::Client for Http3Client { fn stats(&self) -> neqo_transport::Stats { self.transport_stats() } + + fn has_events(&self) -> bool { + neqo_common::event::Provider::has_events(self) + } } impl<'a> super::Handler for Handler<'a> { diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index e322a622e..5b686be8c 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -369,6 +369,7 @@ trait Handler { trait Client { fn process_output(&mut self, now: Instant) -> Output; fn process_input(&mut self, dgram: &Datagram, now: Instant); + fn has_events(&self) -> bool; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; @@ -391,20 +392,21 @@ impl<'a, H: Handler> Runner<'a, H> { let handler_done = self.handler.handle(&mut self.client)?; match (handler_done, self.args.resume, self.handler.has_token()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); } + } + // TODO: Rename process_output. self.process().await?; if self.client.is_closed() { @@ -414,6 +416,10 @@ impl<'a, H: Handler> Runner<'a, H> { return Ok(self.handler.take_token()); } + if self.client.has_events() { + continue; + } + match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { let dgrams = self.socket.recv(&self.local_addr)?; diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 75cb42368..ced3d62c3 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -215,6 +215,7 @@ fn qns_read_response(filename: &str) -> Option> { trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; fn process_events(&mut self, args: &Args, now: Instant); + fn has_events(&self) -> bool; fn set_qlog_dir(&mut self, dir: Option); fn set_ciphers(&mut self, ciphers: &[Cipher]); fn validate_address(&mut self, when: ValidateAddress); @@ -434,6 +435,10 @@ impl HttpServer for SimpleServer { .unwrap(); self.server.ech_config() } + + fn has_events(&self) -> bool { + self.server.has_events() + } } struct ServersRunner { @@ -575,6 +580,14 @@ impl ServersRunner { async fn run(&mut self) -> Res<()> { loop { + self.server.process_events(&self.args, self.args.now()); + + self.process(None).await?; + + if self.server.has_events() { + continue; + } + match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); @@ -592,9 +605,6 @@ impl ServersRunner { self.process(None).await?; } } - - self.server.process_events(&self.args, self.args.now()); - self.process(None).await?; } } } diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/old_https.rs index 505a16578..948a25182 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/old_https.rs @@ -247,6 +247,10 @@ impl HttpServer for Http09Server { .expect("enable ECH"); self.server.ech_config() } + + fn has_events(&self) -> bool { + self.server.has_active_connections() + } } impl Display for Http09Server { diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 7d3d144a0..13817b6a0 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -686,6 +686,13 @@ impl Server { mem::take(&mut self.active).into_iter().collect() } + /// Whether any connections have received new events as a result of calling + /// `process()`. + #[must_use] + pub fn has_active_connections(&self) -> bool { + !self.active.is_empty() + } + pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) { self.waiting.push_back(c.connection()); }