diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 13bd22c1715..b10077d41b3 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -298,6 +298,32 @@ where self.handler.inject_address_change(new_address); } + fn handle_connection_handler_event( + &mut self, + handler_event: ConnectionHandlerEvent< + TConnectionHandler::OutboundProtocol, + TConnectionHandler::OutboundOpenInfo, + TConnectionHandler::OutEvent, + TConnectionHandler::Error, + >, + ) -> Result< + Event, TConnectionHandler::OutEvent>, + Error, + > { + match handler_event { + ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)), + ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { + let id = self.unique_dial_upgrade_id; + let timeout = *protocol.timeout(); + self.unique_dial_upgrade_id += 1; + let (upgrade, info) = protocol.into_upgrade(); + self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); + Ok(Event::OutboundSubstreamRequest((id, info, timeout))) + } + ConnectionHandlerEvent::Close(err) => Err(err.into()), + } + } + pub fn poll( &mut self, cx: &mut Context<'_>, @@ -307,27 +333,43 @@ where Error, >, > { - while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_inbound(upgrade, user_data), - Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), + loop { + // Poll the [`ConnectionHandler`]. + if let Poll::Ready(handler_event) = self.handler.poll(cx) { + let wrapper_event = self.handle_connection_handler_event(handler_event)?; + return Poll::Ready(Ok(wrapper_event)); } - } - while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_outbound(upgrade, user_data), - Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_outbound(upgrade, user_data), + Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + } + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; + } + + // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not + // make any more progress, poll the negotiating inbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_inbound(upgrade, user_data), + Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), + } + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; } - } - // Poll the handler at the end so that we see the consequences of the method - // calls on `self.handler`. - let poll_result = self.handler.poll(cx); + // None of the three can make any more progress, thus breaking the loop. + break; + } // Ask the handler whether it wants the connection (and the handler itself) // to be kept alive, which determines the planned shutdown, if any. @@ -349,22 +391,6 @@ where (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, }; - match poll_result { - Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(Ok(Event::Custom(event))); - } - Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { - let id = self.unique_dial_upgrade_id; - let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; - let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - return Poll::Ready(Ok(Event::OutboundSubstreamRequest((id, info, timeout)))); - } - Poll::Ready(ConnectionHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())), - Poll::Pending => (), - }; - // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {