Skip to content

Commit

Permalink
Emit events for active connection close and fix disconnect().
Browse files Browse the repository at this point in the history
The `Network` does currently not emit events for actively
closed connections, e.g. via `EstablishedConnection::close`
or `ConnectedPeer::disconnect()`. As a result, when actively
closing connections, there will be `ConnectionEstablished`
events emitted without eventually a matching `ConnectionClosed`
event. This seems undesirable and has the consequence that
the `Swarm::ban_peer_id` feature in `libp2p-swarm` does not
result in appropriate calls to `NetworkBehaviour::inject_connection_closed`
and `NetworkBehaviour::inject_disconnected`. Furthermore,
the `disconnect()` functionality in `libp2p-core` is currently
broken as it leaves the `Pool` in an inconsistent state.

This commit does the following:

  1. When connection background tasks are dropped
     (i.e. removed from the `Manager`), they
     always terminate immediately, without attempting
     an orderly close of the connection.
  2. An orderly close is sent to the background task
     of a connection as a regular command. The
     background task emits a `Closed` event
     before terminating.
  3. `Pool::disconnect()` removes all connection
     tasks for the affected peer from the `Manager`,
     i.e. without an orderly close, thereby also
     fixing the discovered state inconsistency
     due to not removing the corresponding entries
     in the `Pool` itself after removing them from
     the `Manager`.
  4. A new test is added to `libp2p-swarm` that
     exercises the ban/unban functionality and
     places assertions on the number and order
     of calls to the `NetworkBehaviour`. In that
     context some new testing utilities have
     been added to `libp2p-swarm`.

This addresses libp2p#1584.
  • Loading branch information
Roman S. Borschel committed Jun 19, 2020
1 parent b983c94 commit 6c2b546
Show file tree
Hide file tree
Showing 12 changed files with 724 additions and 232 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal
async-std = "~1.5.0"
libp2p-mplex = { version = "0.19.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.19.0", path = "../protocols/secio" }
libp2p-tcp = { version = "0.19.0", path = "../transports/tcp" }
libp2p-tcp = { version = "0.19.0", path = "../transports/tcp", features = ["async-std"] }
quickcheck = "0.9.0"
wasm-timer = "0.2"

Expand Down
2 changes: 1 addition & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandl
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection, StartClose};

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
Expand Down
62 changes: 42 additions & 20 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
handler: H
},

/// An established connection has encountered an error.
ConnectionError {
/// An established connection has been closed.
ConnectionClosed {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
/// > **Note**: Closed connections are removed from the `Manager`.
/// > Hence this ID will no longer resolve to a valid entry in
/// > the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
/// Information about the closed connection.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
},

/// A connection has been established.
Expand Down Expand Up @@ -336,11 +337,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
match self.events_rx.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
Expand Down Expand Up @@ -369,18 +370,17 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
let _ = task.remove();
Event::PendingConnectionError { id, error, handler }
}
task::Event::Error { id, error } => {
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
Event::ConnectionClosed { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
"`Event::Closed` implies (2) occurred on that task and thus (3)."
),
}
}

})
} else {
unreachable!("By (1)")
Expand Down Expand Up @@ -426,10 +426,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
let cmd = task::Command::NotifyHandler(event); // (*)
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
task::Command::NotifyHandler(event) => event,
_ => unreachable!("by (*)")
})
}

Expand All @@ -443,6 +444,24 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}

/// Tries to send a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn poll_start_close(&mut self, cx: &mut Context) -> Poll<()> {
match self.task.get_mut().sender.poll_ready(cx) {
Poll::Ready(result) => {
if result.is_ok() {
// If it fails now then the task is already gone.
let _ = self.task.get_mut().sender.try_send(task::Command::Close);
}
Poll::Ready(())
}
Poll::Pending => Poll::Pending
}
}

/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
Expand All @@ -451,16 +470,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
}

/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}

/// Returns the connection id.
/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
Expand All @@ -484,3 +505,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

Loading

0 comments on commit 6c2b546

Please sign in to comment.