Skip to content

Commit

Permalink
[libp2p-swarm] Correct returned connections from notify_all. (#1513)
Browse files Browse the repository at this point in the history
* [libp2p-swarm] Correct returned connections from notify_all.

If at least one connection was not ready (i.e. pending), only
those (pending) connections would be returned and considered on the next
iteration, whereas those which were ready should also remain
in the list of connections to notify on retry of `notify_all`.

* Simplify.

It seems unnecessary to use "poll all" -> "send all" semantics,
i.e. attempting an "atomic" broadcast. Rather, events send via
`notify_all` can be delivered as soon as possible, simplifying
the code further.
  • Loading branch information
romanb committed Mar 25, 2020
1 parent 5f86f15 commit 28ea62d
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,9 @@ where
/// Notify all of the given connections of a peer of an event.
///
/// Returns `Some` with the given event and a new list of connections if
/// at least one of the given connections is not currently able to receive the event
/// but is not closing, in which case the current task is scheduled to be woken up.
/// The returned connections are those which are not closing.
/// at least one of the given connections is currently not able to receive
/// the event, in which case the current task is scheduled to be woken up and
/// the returned connections are those which still need to receive the event.
///
/// Returns `None` if all connections are either closing or the event
/// was successfully sent to all handlers whose connections are not closing,
Expand All @@ -707,26 +707,23 @@ where
}
}

{
let mut pending = SmallVec::new();
for id in ids.iter() {
if let Some(mut conn) = peer.connection(*id) { // (*)
if conn.poll_ready_notify_handler(cx).is_pending() {
pending.push(*id)
}
let mut pending = SmallVec::new();
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Ok(())) => {
// Can now only fail due to the connection suddenly closing,
// which we ignore.
let _ = conn.notify_handler(event.clone());
},
Poll::Ready(Err(())) => {} // connection is closing
}
}
if !pending.is_empty() {
return Some((event, pending))
}
}

for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
// All connections were ready. Can now only fail due
// to a connection suddenly closing, which we ignore.
let _ = conn.notify_handler(event.clone());
}
if !pending.is_empty() {
return Some((event, pending))
}

None
Expand Down

0 comments on commit 28ea62d

Please sign in to comment.