From ec5c9a10e23e709ac26a86acea3235b5e0ecea11 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 10 May 2022 17:19:24 +0200 Subject: [PATCH 1/3] swarm/src/connection: Prioritize handler over negotiating streams The `HandlerWrapper` polls three components: 1. `ConnectionHandler` 2. Outbound negotiating streams 3. Inbound negotiating streams The `ConnectionHandler` itself might itself poll already negotiated streams. By polling the three components above in the listed order one: - Prioritizes local work and work coming from negotiated streams over negotiating streams. - Prioritizes outbound negotiating streams over inbound negotiating streams, i.e. outbound requests over inbound requests. --- swarm/src/connection/handler_wrapper.rs | 80 +++++++++++++++---------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index ad487913729..29b0f543df8 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -307,27 +307,57 @@ where Error, >, > { - while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_inbound(upgrade, user_data), - Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), + loop { + // Poll the [`ConnectionHandler`]. + if let Poll::Ready(res) = self.handler.poll(cx) { + match res { + ConnectionHandlerEvent::Custom(event) => { + return Poll::Ready(Ok(Event::Custom(event))); + } + ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { + let id = self.unique_dial_upgrade_id; + let timeout = *protocol.timeout(); + self.unique_dial_upgrade_id += 1; + let (upgrade, info) = protocol.into_upgrade(); + self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); + return Poll::Ready(Ok(Event::OutboundSubstreamRequest(( + id, info, timeout, + )))); + } + ConnectionHandlerEvent::Close(err) => return Poll::Ready(Err(err.into())), + } } - } - while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_outbound(upgrade, user_data), - Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_outbound(upgrade, user_data), + Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + } + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; } - } - // Poll the handler at the end so that we see the consequences of the method - // calls on `self.handler`. - let poll_result = self.handler.poll(cx); + // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not + // make any more progress, poll the negotiating inbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_inbound(upgrade, user_data), + Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), + } + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; + } + + // None of the three can make any more progress, thus breaking the loop. + break; + } // Ask the handler whether it wants the connection (and the handler itself) // to be kept alive, which determines the planned shutdown, if any. @@ -349,22 +379,6 @@ where (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, }; - match poll_result { - Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(Ok(Event::Custom(event))); - } - Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { - let id = self.unique_dial_upgrade_id; - let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; - let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - return Poll::Ready(Ok(Event::OutboundSubstreamRequest((id, info, timeout)))); - } - Poll::Ready(ConnectionHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())), - Poll::Pending => (), - }; - // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { From 1de8039e853f45c248a2e860b898dcac50121499 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 May 2022 18:33:41 +0200 Subject: [PATCH 2/3] swarm/src/connection/: Extract into handle_connection_handler_event --- swarm/src/connection/handler_wrapper.rs | 46 ++++++++++++++++--------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 29b0f543df8..d73023a5f7d 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -298,6 +298,32 @@ where self.handler.inject_address_change(new_address); } + fn handle_connection_handler_event( + &mut self, + handler_event: ConnectionHandlerEvent< + TProtoHandler::OutboundProtocol, + TProtoHandler::OutboundOpenInfo, + TProtoHandler::OutEvent, + TProtoHandler::Error, + >, + ) -> Result< + Event, TProtoHandler::OutEvent>, + Error, + > { + match handler_event { + ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)), + ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { + let id = self.unique_dial_upgrade_id; + let timeout = *protocol.timeout(); + self.unique_dial_upgrade_id += 1; + let (upgrade, info) = protocol.into_upgrade(); + self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); + Ok(Event::OutboundSubstreamRequest((id, info, timeout))) + } + ConnectionHandlerEvent::Close(err) => Err(err.into()), + } + } + pub fn poll( &mut self, cx: &mut Context<'_>, @@ -309,23 +335,9 @@ where > { loop { // Poll the [`ConnectionHandler`]. - if let Poll::Ready(res) = self.handler.poll(cx) { - match res { - ConnectionHandlerEvent::Custom(event) => { - return Poll::Ready(Ok(Event::Custom(event))); - } - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - let id = self.unique_dial_upgrade_id; - let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; - let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - return Poll::Ready(Ok(Event::OutboundSubstreamRequest(( - id, info, timeout, - )))); - } - ConnectionHandlerEvent::Close(err) => return Poll::Ready(Err(err.into())), - } + if let Poll::Ready(handler_event) = self.handler.poll(cx) { + let wrapper_event = self.handle_connection_handler_event(handler_event)?; + return Poll::Ready(Ok(wrapper_event)); } // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. From 550c6782681fdcfea71eb35e17ffaa23dc65dd21 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 May 2022 10:38:58 +0200 Subject: [PATCH 3/3] swarm/src/connection: Rename TProtoHandler to TConnectionHandler --- swarm/src/connection/handler_wrapper.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 13e66736809..b10077d41b3 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -301,14 +301,14 @@ where fn handle_connection_handler_event( &mut self, handler_event: ConnectionHandlerEvent< - TProtoHandler::OutboundProtocol, - TProtoHandler::OutboundOpenInfo, - TProtoHandler::OutEvent, - TProtoHandler::Error, + TConnectionHandler::OutboundProtocol, + TConnectionHandler::OutboundOpenInfo, + TConnectionHandler::OutEvent, + TConnectionHandler::Error, >, ) -> Result< - Event, TProtoHandler::OutEvent>, - Error, + Event, TConnectionHandler::OutEvent>, + Error, > { match handler_event { ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)),