Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 6d7c73a
Merge: 2c9f0d3 25004c4
Author: Max Inden <mail@max-inden.de>
Date:   Mon Aug 23 15:04:51 2021 +0200

    Merge branch 'libp2p/master' into handler

commit 2c9f0d3
Author: Max Inden <mail@max-inden.de>
Date:   Mon Aug 23 14:53:42 2021 +0200

    protocols/: Remove unused imports

commit aa02e5f
Author: Max Inden <mail@max-inden.de>
Date:   Fri Aug 20 14:57:22 2021 +0200

    swarm/src/toggle: Fix TODOs

commit b73139a
Author: Max Inden <mail@max-inden.de>
Date:   Fri Aug 20 14:37:36 2021 +0200

    core/src/connection: Remove outdated doc comment

commit d4960b7
Author: Max Inden <mail@max-inden.de>
Date:   Fri Aug 20 14:30:40 2021 +0200

    *: Format with rustfmt

commit a78de13
Author: Max Inden <mail@max-inden.de>
Date:   Fri Aug 20 14:24:32 2021 +0200

    core/: Fix ConectionClose and PendingAborted reporting

commit 174693a
Author: Max Inden <mail@max-inden.de>
Date:   Fri Aug 20 13:44:07 2021 +0200

    swarm-derive: Adjust to changes

commit 62c5e13
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 21:46:58 2021 +0200

    protocols/*: Update

commit 9262c03
Merge: 682f6be 1e9fcf9
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 20:21:35 2021 +0200

    Merge branch 'libp2p/master' into handler

commit 682f6be
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 19:41:10 2021 +0200

    protocols/*: Update

commit 7d9285f
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 19:08:13 2021 +0200

    core/CHANGELOG: Add entry for ConnectionLimit change

commit 5ff6397
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 19:01:56 2021 +0200

    core/tests: Adjust to type changes

commit 425e777
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 19 18:50:16 2021 +0200

    core/: Emit Event::Failed on aborted pending connection

commit ce0d278
Author: Max Inden <mail@max-inden.de>
Date:   Wed Aug 18 18:03:35 2021 +0200

    core/src/connection: Report ConnectionLimit through task

commit 5c2aef6
Author: Max Inden <mail@max-inden.de>
Date:   Wed Aug 18 17:31:13 2021 +0200

    core/: Remove DisconnectedPeer::set_connected and Pool::add

    This logic seems to be a leftover of
    libp2p#889 and unused today.

commit d744b4a
Author: Max Inden <mail@max-inden.de>
Date:   Wed Aug 18 17:13:19 2021 +0200

    core/src/connection/manager: Fully close a task on disconnect

commit 705842f
Author: Max Inden <mail@max-inden.de>
Date:   Wed Aug 18 16:49:26 2021 +0200

    swarm/src/behaviour: Add default trait para on NetworkBehaviourAction

commit 595623f
Merge: d756be1 f2905c0
Author: Max Inden <mail@max-inden.de>
Date:   Wed Aug 18 15:09:08 2021 +0200

    Merge branch 'libp2p/master' into handler

commit d756be1
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 12 17:16:20 2021 +0200

    swarm/src/behaviour: Provide handler with Dial and DialAddr

commit 46904a6
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 12 16:51:13 2021 +0200

    swarm/: Inject handler on connection error and closed

commit a332591
Author: Max Inden <mail@max-inden.de>
Date:   Thu Aug 12 16:17:37 2021 +0200

    core/: Return handler on connection error and closed
  • Loading branch information
mxinden committed Aug 23, 2021
1 parent 82ebd5b commit 4098363
Show file tree
Hide file tree
Showing 39 changed files with 709 additions and 432 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ libp2p-pnet = { version = "0.21.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.31.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.24.0", path = "swarm-derive" }
libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" }
libp2p-uds = { version = "0.30.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.30.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.34.0", path = "muxers/yamux", optional = true }
Expand Down
7 changes: 6 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).
- Report `ConnectionLimit` error through `ConnectionError` and thus through
`NetworkEvent::ConnectionClosed` instead of previously through
`PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError,
DialError}` (see [PR 2191]).

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]
Expand Down
8 changes: 4 additions & 4 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ where
self.handler.inject_event(event);
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
Expand Down
21 changes: 13 additions & 8 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -41,6 +45,9 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -53,6 +60,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand All @@ -63,14 +71,13 @@ pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),

/// Pending connection attempt has been aborted.
Aborted,

/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
Expand All @@ -83,15 +90,13 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
}
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -105,7 +110,7 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
PendingConnectionError::Aborted => None,
}
}
}
25 changes: 8 additions & 17 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},

/// A connection has been established.
Expand Down Expand Up @@ -350,14 +351,15 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
new_endpoint: new,
}
}
task::Event::Closed { id, error } => {
task::Event::Closed { id, error, handler } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) => Event::ConnectionClosed {
id,
connected,
error,
handler,
},
TaskState::Pending => unreachable!(
"`Event::Closed` implies (2) occurred on that task and thus (3)."
Expand Down Expand Up @@ -431,21 +433,21 @@ impl<'a, I> EstablishedEntry<'a, I> {
}

/// Sends a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
/// thus initiating a graceful active close of the connectione
///
/// Has no effect if the connection is already closing.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
pub fn start_close(mut self, error: Option<ConnectionLimit>) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self
.task
.get_mut()
.sender
.clone()
.try_send(task::Command::Close)
.try_send(task::Command::Close(error))
{
Ok(()) => {}
Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
Expand All @@ -460,17 +462,6 @@ impl<'a, I> EstablishedEntry<'a, I> {
}
}

/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()"),
}
}

/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
Expand Down
57 changes: 45 additions & 12 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Multiaddr,
Expand All @@ -43,7 +43,7 @@ pub enum Command<T> {
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close,
Close(Option<ConnectionLimit>),
}

/// Events that a task can emit to its manager.
Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum Event<H: IntoConnectionHandler, TE> {
Closed {
id: TaskId,
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},
}

Expand Down Expand Up @@ -159,7 +160,11 @@ where
},

/// The connection is closing (active close).
Closing(Close<M>),
Closing {
closing_muxer: Close<M>,
handler: H::Handler,
error: Option<ConnectionLimit>,
},

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -204,7 +209,16 @@ where
Poll::Pending => {}
Poll::Ready(None) => {
// The manager has dropped the task; abort.
return Poll::Ready(());
// Don't accept any further commands and terminate the
// task with a final event.
this.commands.get_mut().close();
let event = Event::Failed {
id,
handler,
error: PendingConnectionError::Aborted,
};
this.state = State::Terminating(event);
continue 'poll;
}
Poll::Ready(Some(_)) => {
panic!("Task received command while the connection is pending.")
Expand Down Expand Up @@ -243,16 +257,23 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.inject_event(event)
}
Poll::Ready(Some(Command::Close)) => {
Poll::Ready(Some(Command::Close(error))) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
error,
};
continue 'poll;
}
Poll::Ready(None) => {
todo!("Safe to assume that this should never happen?");
// The manager has dropped the task or disappeared; abort.
return Poll::Ready(());
// TODO: Should we return the handler in this case?
// return Poll::Ready(());
}
}
}
Expand Down Expand Up @@ -306,36 +327,48 @@ where
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
let (handler, _closing_muxer) = connection.close();
// Terminate the task with the error, dropping the connection.
let event = Event::Closed {
id,
error: Some(error),
handler,
};
this.state = State::Terminating(event);
}
}
}
}

State::Closing(mut closing) => {
State::Closing {
handler,
error,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
error: error.map(|limit| ConnectionError::ConnectionLimit(limit)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
this.state = State::Closing {
handler,
error,
closing_muxer,
};
return Poll::Pending;
}
}
Expand Down
Loading

0 comments on commit 4098363

Please sign in to comment.