From 993c5055695638a485d4f2371ace8a6baca09987 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 23 Jun 2024 12:07:13 +0200 Subject: [PATCH 1/2] quinn: introduce wake_all() helper --- quinn/src/connection.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 01087c2bf..d25841569 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -1167,12 +1167,8 @@ impl State { if let Some(x) = self.on_handshake_data.take() { let _ = x.send(()); } - for (_, writer) in self.blocked_writers.drain() { - writer.wake() - } - for (_, reader) in self.blocked_readers.drain() { - reader.wake() - } + wake_all(&mut self.blocked_writers); + wake_all(&mut self.blocked_readers); shared.stream_budget_available[Dir::Uni as usize].notify_waiters(); shared.stream_budget_available[Dir::Bi as usize].notify_waiters(); shared.stream_incoming[Dir::Uni as usize].notify_waiters(); @@ -1182,9 +1178,7 @@ impl State { if let Some(x) = self.on_connected.take() { let _ = x.send(false); } - for (_, waker) in self.stopped.drain() { - waker.wake(); - } + wake_all(&mut self.stopped); shared.closed.notify_waiters(); } @@ -1228,6 +1222,10 @@ impl fmt::Debug for State { } } +fn wake_all(wakers: &mut FxHashMap) { + wakers.drain().for_each(|(_, waker)| waker.wake()) +} + /// Errors that can arise when sending a datagram #[derive(Debug, Error, Clone, Eq, PartialEq)] pub enum SendDatagramError { From bdade5b5fa98aff0975a4f8ef5d89b5f95ddce5e Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 23 Jun 2024 12:11:45 +0200 Subject: [PATCH 2/2] quinn: introduce wake_stream() helper --- quinn/src/connection.rs | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index d25841569..7ca56ceff 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -1061,11 +1061,7 @@ impl State { ConnectionLost { reason } => { self.terminate(reason, shared); } - Stream(StreamEvent::Writable { id }) => { - if let Some(writer) = self.blocked_writers.remove(&id) { - writer.wake(); - } - } + Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers), Stream(StreamEvent::Opened { dir: Dir::Uni }) => { shared.stream_incoming[Dir::Uni as usize].notify_waiters(); } @@ -1078,27 +1074,15 @@ impl State { DatagramsUnblocked => { shared.datagrams_unblocked.notify_waiters(); } - Stream(StreamEvent::Readable { id }) => { - if let Some(reader) = self.blocked_readers.remove(&id) { - reader.wake(); - } - } + Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers), Stream(StreamEvent::Available { dir }) => { // Might mean any number of streams are ready, so we wake up everyone shared.stream_budget_available[dir as usize].notify_waiters(); } - Stream(StreamEvent::Finished { id }) => { - if let Some(stopped) = self.stopped.remove(&id) { - stopped.wake(); - } - } + Stream(StreamEvent::Finished { id }) => wake_stream(id, &mut self.stopped), Stream(StreamEvent::Stopped { id, .. }) => { - if let Some(stopped) = self.stopped.remove(&id) { - stopped.wake(); - } - if let Some(writer) = self.blocked_writers.remove(&id) { - writer.wake(); - } + wake_stream(id, &mut self.stopped); + wake_stream(id, &mut self.blocked_writers); } } } @@ -1222,6 +1206,12 @@ impl fmt::Debug for State { } } +fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap) { + if let Some(waker) = wakers.remove(&stream_id) { + waker.wake(); + } +} + fn wake_all(wakers: &mut FxHashMap) { wakers.drain().for_each(|(_, waker)| waker.wake()) }