Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Refactor NotifsHandler::poll (#8422)
Browse files Browse the repository at this point in the history
* Refactor a bit NotifsHandler::poll

* Avoid some spurious wake-ups
  • Loading branch information
tomaka authored Mar 23, 2021
1 parent 83942f5 commit 067f185
Showing 1 changed file with 79 additions and 64 deletions.
143 changes: 79 additions & 64 deletions client/network/src/protocol/notifications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ enum State {
/// We use two different channels in order to have two different channel sizes, but from
/// the receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
notifications_sink_rx: stream::Select<
notifications_sink_rx: stream::Peekable<stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>,
>>,

/// Outbound substream that has been accepted by the remote.
///
Expand Down Expand Up @@ -552,7 +552,7 @@ impl ProtocolsHandler for NotifsHandler {
};

self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()),
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
in_substream: in_substream.take(),
};
Expand Down Expand Up @@ -716,8 +716,80 @@ impl ProtocolsHandler for NotifsHandler {
return Poll::Ready(ev);
}

// For each open substream, try send messages from `notifications_sink_rx` to the
// substream.
for protocol_index in 0..self.protocols.len() {
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Only proceed with `out_substream.poll_ready_unpin` if there is an element
// available in `notifications_sink_rx`. This avoids waking up the task when
// a substream is ready to send if there isn't actually something to send.
match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
},
Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
Poll::Ready(None) | Poll::Pending => break,
}

// Before we extract the element from `notifications_sink_rx`, check that the
// substream is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}

// Now that the substream is ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message,
Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
| Poll::Ready(None)
| Poll::Pending => {
// Should never be reached, as per `poll_peek` above.
debug_assert!(false);
break;
}
};

let _ = out_substream.start_send_unpin(message);
// Note that flushing is performed later down this function.
}
}
}

// Flush all outbound substreams.
// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
// `poll` again before it is ready to accept more events.
// In order to make sure that substreams are flushed as soon as possible, the flush is
// performed before the code paths that can produce `Ready` (with some rare exceptions).
// Importantly, however, the flush is performed *after* notifications are queued with
// `Sink::start_send`.
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}

State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}
}

// Poll inbound substreams.
for protocol_index in 0..self.protocols.len() {
// Poll inbound substreams.
// Inbound substreams being closed is always tolerated, except for the
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
match &mut self.protocols[protocol_index].state {
Expand Down Expand Up @@ -763,68 +835,11 @@ impl ProtocolsHandler for NotifsHandler {
}
}
}

// Poll outbound substream.
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}

State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}

if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Before we poll the notifications sink receiver, check that the substream
// is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}

// Now that all substreams are ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => break,
};

match message {
NotificationsSinkMessage::Notification { message } => {
let _ = out_substream.start_send_unpin(message);

// Calling `start_send_unpin` only queues the message. Actually
// emitting the message is done with `poll_flush`. In order to
// not introduce too much complexity, this flushing is done earlier
// in the body of this `poll()` method. As such, we schedule a task
// wake-up now in order to guarantee that `poll()` will be called
// again and the flush happening.
// At the time of the writing of this comment, a rewrite of this
// code is being planned. If you find this comment in the wild and
// the rewrite didn't happen, please consider a refactor.
cx.waker().wake_by_ref();
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
}
}
}
}
}

// This is the only place in this method that can return `Pending`.
// By putting it at the very bottom, we are guaranteed that everything has been properly
// polled.
Poll::Pending
}
}

0 comments on commit 067f185

Please sign in to comment.