diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 01087c2bf..7ca56ceff 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -1061,11 +1061,7 @@ impl State { ConnectionLost { reason } => { self.terminate(reason, shared); } - Stream(StreamEvent::Writable { id }) => { - if let Some(writer) = self.blocked_writers.remove(&id) { - writer.wake(); - } - } + Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers), Stream(StreamEvent::Opened { dir: Dir::Uni }) => { shared.stream_incoming[Dir::Uni as usize].notify_waiters(); } @@ -1078,27 +1074,15 @@ impl State { DatagramsUnblocked => { shared.datagrams_unblocked.notify_waiters(); } - Stream(StreamEvent::Readable { id }) => { - if let Some(reader) = self.blocked_readers.remove(&id) { - reader.wake(); - } - } + Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers), Stream(StreamEvent::Available { dir }) => { // Might mean any number of streams are ready, so we wake up everyone shared.stream_budget_available[dir as usize].notify_waiters(); } - Stream(StreamEvent::Finished { id }) => { - if let Some(stopped) = self.stopped.remove(&id) { - stopped.wake(); - } - } + Stream(StreamEvent::Finished { id }) => wake_stream(id, &mut self.stopped), Stream(StreamEvent::Stopped { id, .. }) => { - if let Some(stopped) = self.stopped.remove(&id) { - stopped.wake(); - } - if let Some(writer) = self.blocked_writers.remove(&id) { - writer.wake(); - } + wake_stream(id, &mut self.stopped); + wake_stream(id, &mut self.blocked_writers); } } } @@ -1167,12 +1151,8 @@ impl State { if let Some(x) = self.on_handshake_data.take() { let _ = x.send(()); } - for (_, writer) in self.blocked_writers.drain() { - writer.wake() - } - for (_, reader) in self.blocked_readers.drain() { - reader.wake() - } + wake_all(&mut self.blocked_writers); + wake_all(&mut self.blocked_readers); shared.stream_budget_available[Dir::Uni as usize].notify_waiters(); shared.stream_budget_available[Dir::Bi as usize].notify_waiters(); shared.stream_incoming[Dir::Uni as usize].notify_waiters(); @@ -1182,9 +1162,7 @@ impl State { if let Some(x) = self.on_connected.take() { let _ = x.send(false); } - for (_, waker) in self.stopped.drain() { - waker.wake(); - } + wake_all(&mut self.stopped); shared.closed.notify_waiters(); } @@ -1228,6 +1206,16 @@ impl fmt::Debug for State { } } +fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap) { + if let Some(waker) = wakers.remove(&stream_id) { + waker.wake(); + } +} + +fn wake_all(wakers: &mut FxHashMap) { + wakers.drain().for_each(|(_, waker)| waker.wake()) +} + /// Errors that can arise when sending a datagram #[derive(Debug, Error, Clone, Eq, PartialEq)] pub enum SendDatagramError {