Skip to content

Commit

Permalink
Don't await input if there are events pending
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Apr 5, 2024
1 parent 7681cf0 commit c2b0721
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 15 deletions.
4 changes: 4 additions & 0 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl super::Client for Connection {
fn stats(&self) -> neqo_transport::Stats {
self.stats()
}

fn has_events(&self) -> bool {
neqo_common::event::Provider::has_events(self)
}
}

impl<'b> Handler<'b> {
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl super::Client for Http3Client {
fn stats(&self) -> neqo_transport::Stats {
self.transport_stats()
}

fn has_events(&self) -> bool {
neqo_common::event::Provider::has_events(self)
}
}

impl<'a> super::Handler for Handler<'a> {
Expand Down
30 changes: 18 additions & 12 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ trait Handler {
trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand All @@ -391,20 +392,21 @@ impl<'a, H: Handler> Runner<'a, H> {
let handler_done = self.handler.handle(&mut self.client)?;

match (handler_done, self.args.resume, self.handler.has_token()) {
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
}

// TODO: Rename process_output.
self.process().await?;

if self.client.is_closed() {
Expand All @@ -414,6 +416,10 @@ impl<'a, H: Handler> Runner<'a, H> {
return Ok(self.handler.take_token());
}

if self.client.has_events() {
continue;
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
Expand Down
16 changes: 13 additions & 3 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ fn qns_read_response(filename: &str) -> Option<Vec<u8>> {
trait HttpServer: Display {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_events(&mut self, args: &Args, now: Instant);
fn has_events(&self) -> bool;
fn set_qlog_dir(&mut self, dir: Option<PathBuf>);
fn set_ciphers(&mut self, ciphers: &[Cipher]);
fn validate_address(&mut self, when: ValidateAddress);
Expand Down Expand Up @@ -434,6 +435,10 @@ impl HttpServer for SimpleServer {
.unwrap();
self.server.ech_config()
}

fn has_events(&self) -> bool {
self.server.has_events()
}
}

struct ServersRunner {
Expand Down Expand Up @@ -575,6 +580,14 @@ impl ServersRunner {

async fn run(&mut self) -> Res<()> {
loop {
self.server.process_events(&self.args, self.args.now());

self.process(None).await?;

if self.server.has_events() {
continue;
}

match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
Expand All @@ -592,9 +605,6 @@ impl ServersRunner {
self.process(None).await?;
}
}

self.server.process_events(&self.args, self.args.now());
self.process(None).await?;
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/server/old_https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ impl HttpServer for Http09Server {
.expect("enable ECH");
self.server.ech_config()
}

fn has_events(&self) -> bool {
self.server.has_active_connections()
}
}

impl Display for Http09Server {
Expand Down
7 changes: 7 additions & 0 deletions neqo-transport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,13 @@ impl Server {
mem::take(&mut self.active).into_iter().collect()
}

/// Whether any connections have received new events as a result of calling
/// `process()`.
#[must_use]
pub fn has_active_connections(&self) -> bool {
!self.active.is_empty()
}

pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) {
self.waiting.push_back(c.connection());
}
Expand Down

0 comments on commit c2b0721

Please sign in to comment.