Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/src/connection: Prioritize handler over negotiating streams #2638

Merged
merged 5 commits into from
May 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 59 additions & 33 deletions swarm/src/connection/handler_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,32 @@ where
self.handler.inject_address_change(new_address);
}

fn handle_connection_handler_event(
&mut self,
handler_event: ConnectionHandlerEvent<
TConnectionHandler::OutboundProtocol,
TConnectionHandler::OutboundOpenInfo,
TConnectionHandler::OutEvent,
TConnectionHandler::Error,
>,
) -> Result<
Event<OutboundOpenInfo<TConnectionHandler>, TConnectionHandler::OutEvent>,
Error<TConnectionHandler::Error>,
> {
match handler_event {
ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)),
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
let id = self.unique_dial_upgrade_id;
let timeout = *protocol.timeout();
self.unique_dial_upgrade_id += 1;
let (upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, SendWrapper(upgrade)));
Ok(Event::OutboundSubstreamRequest((id, info, timeout)))
}
ConnectionHandlerEvent::Close(err) => Err(err.into()),
}
}

pub fn poll(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -307,27 +333,43 @@ where
Error<TConnectionHandler::Error>,
>,
> {
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self
.handler
.inject_fully_negotiated_inbound(upgrade, user_data),
Err(err) => self.handler.inject_listen_upgrade_error(user_data, err),
loop {
// Poll the [`ConnectionHandler`].
if let Poll::Ready(handler_event) = self.handler.poll(cx) {
let wrapper_event = self.handle_connection_handler_event(handler_event)?;
return Poll::Ready(Ok(wrapper_event));
}
}

while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self
.handler
.inject_fully_negotiated_outbound(upgrade, user_data),
Err(err) => self.handler.inject_dial_upgrade_error(user_data, err),
// In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams.
if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self
.handler
.inject_fully_negotiated_outbound(upgrade, user_data),
Err(err) => self.handler.inject_dial_upgrade_error(user_data, err),
}

// After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress.
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could also recurse here and thus avoid the loop? Not sure if that is necessarily easier to understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. What do you mean with "recurse"? As in a recursive call to the same method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. You should be able to just call self.poll here I believe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not seen recursion used on any serious Rust project. Not saying that it does not exist. Thus far I have avoided recursion in Rust due to:

  • No Tail Call Ellimination
  • Call stack limits
  • Sensing that this is not idiomatic, thus not intuitive

Am I missing something? Do the above arguments not apply here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this particular case, the recursion is very likely to stop after one call, is it not?

But I understand that this might be too much of a detail to notice and thus make the code harder to understand. Also, it is not guaranteed and thus risky to use with the usual problems of recursion.

Happy for the loop to stay, was just an idea :)

}

// In case both the [`ConnectionHandler`] and the negotiating outbound streams can not
// make any more progress, poll the negotiating inbound streams.
if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self
.handler
.inject_fully_negotiated_inbound(upgrade, user_data),
Err(err) => self.handler.inject_listen_upgrade_error(user_data, err),
}

// After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress.
continue;
}
}

// Poll the handler at the end so that we see the consequences of the method
// calls on `self.handler`.
let poll_result = self.handler.poll(cx);
// None of the three can make any more progress, thus breaking the loop.
break;
}

// Ask the handler whether it wants the connection (and the handler itself)
// to be kept alive, which determines the planned shutdown, if any.
Expand All @@ -349,22 +391,6 @@ where
(_, KeepAlive::Yes) => self.shutdown = Shutdown::None,
};

match poll_result {
Poll::Ready(ConnectionHandlerEvent::Custom(event)) => {
return Poll::Ready(Ok(Event::Custom(event)));
}
Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => {
let id = self.unique_dial_upgrade_id;
let timeout = *protocol.timeout();
self.unique_dial_upgrade_id += 1;
let (upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, SendWrapper(upgrade)));
return Poll::Ready(Ok(Event::OutboundSubstreamRequest((id, info, timeout))));
}
Poll::Ready(ConnectionHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => (),
};

// Check if the connection (and handler) should be shut down.
// As long as we're still negotiating substreams, shutdown is always postponed.
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
Expand Down