Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{core/,swarm/}: Dial with handler and return handler on error and closed #2191

Merged
merged 43 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a332591
core/: Return handler on connection error and closed
mxinden Aug 12, 2021
46904a6
swarm/: Inject handler on connection error and closed
mxinden Aug 12, 2021
d756be1
swarm/src/behaviour: Provide handler with Dial and DialAddr
mxinden Aug 12, 2021
595623f
Merge branch 'libp2p/master' into handler
mxinden Aug 18, 2021
705842f
swarm/src/behaviour: Add default trait para on NetworkBehaviourAction
mxinden Aug 18, 2021
d744b4a
core/src/connection/manager: Fully close a task on disconnect
mxinden Aug 18, 2021
5c2aef6
core/: Remove DisconnectedPeer::set_connected and Pool::add
mxinden Aug 18, 2021
ce0d278
core/src/connection: Report ConnectionLimit through task
mxinden Aug 18, 2021
425e777
core/: Emit Event::Failed on aborted pending connection
mxinden Aug 19, 2021
5ff6397
core/tests: Adjust to type changes
mxinden Aug 19, 2021
7d9285f
core/CHANGELOG: Add entry for ConnectionLimit change
mxinden Aug 19, 2021
682f6be
protocols/*: Update
mxinden Aug 19, 2021
9262c03
Merge branch 'libp2p/master' into handler
mxinden Aug 19, 2021
62c5e13
protocols/*: Update
mxinden Aug 19, 2021
174693a
swarm-derive: Adjust to changes
mxinden Aug 20, 2021
a78de13
core/: Fix ConectionClose and PendingAborted reporting
mxinden Aug 20, 2021
d4960b7
*: Format with rustfmt
mxinden Aug 20, 2021
b73139a
core/src/connection: Remove outdated doc comment
mxinden Aug 20, 2021
aa02e5f
swarm/src/toggle: Fix TODOs
mxinden Aug 20, 2021
2c9f0d3
protocols/: Remove unused imports
mxinden Aug 23, 2021
6d7c73a
Merge branch 'libp2p/master' into handler
mxinden Aug 23, 2021
a56980e
core/src/network/event: Use NoZeroU32
mxinden Aug 24, 2021
1c3ed2e
swarm/src/protocols_handler: Rename to into_protocols_handler
mxinden Aug 24, 2021
32fc84e
swarm/src/behaviour: Introduce NetworkBehaviour::inject_listen_failure
mxinden Aug 24, 2021
7ea4908
swarm/src/lib: Inject handler on DialPeerCondition false
mxinden Aug 24, 2021
fbd4681
core/src/connection: Assume manager to always close handler
mxinden Aug 25, 2021
6787e77
swarm-derive: Add comments
mxinden Aug 25, 2021
b864133
swarm: Add documentation
mxinden Aug 25, 2021
b2bf380
*: Format with rustfmt
mxinden Aug 25, 2021
7d342b6
swar/src/behaviour: Link to NotifyHandler not SendEvent
mxinden Aug 25, 2021
5853890
*: Update changelogs
mxinden Aug 25, 2021
4d0faf9
swarm-derive: Fix typo
mxinden Aug 25, 2021
7a45e7b
Apply suggestions from code review
mxinden Aug 26, 2021
cef949c
core/src/network: Revert map_err
mxinden Aug 26, 2021
ceb77e5
core/src/network: Use custom method on DialAttemptsRemaining
mxinden Aug 26, 2021
814ff4b
swarm: Add doc example for carrying state in handler
mxinden Aug 26, 2021
90df72a
Merge branch 'libp2p/master' into handler
mxinden Aug 26, 2021
60c7261
swarm/src/lib: Remove use_handler_to_carry_state
mxinden Aug 30, 2021
a2f1819
core/tests/network_dial_error: Use get_attempts
mxinden Aug 30, 2021
b905545
swarm/src/behaviour.rs: Use heading for doc example
mxinden Aug 30, 2021
99f81d0
core/tests: Format with rustfmt
mxinden Aug 30, 2021
1c5eb8e
Merge branch 'master' into handler
mxinden Aug 30, 2021
c09198e
protocols/gossipsub/src/behaviour.rs: Remove unnecesary assignment
mxinden Aug 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.31.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.24.0", path = "swarm-derive" }
libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" }
libp2p-uds = { version = "0.30.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.30.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.34.0", path = "muxers/yamux", optional = true }
Expand Down
8 changes: 8 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).

- Report `ConnectionLimit` error through `ConnectionError` and thus through
`NetworkEvent::ConnectionClosed` instead of previously through
`PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError,
DialError}` (see [PR 2191]).

- Report abortion of pending connection through `DialError`,
`UnknownPeerDialError` or `IncomingConnectionError` (see [PR 2191]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]
Expand Down
8 changes: 4 additions & 4 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ where
self.handler.inject_event(event);
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
Expand Down
21 changes: 13 additions & 8 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -41,6 +45,9 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -53,6 +60,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand All @@ -63,14 +71,13 @@ pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),

/// Pending connection attempt has been aborted.
Aborted,

/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
Expand All @@ -83,15 +90,13 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
}
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -105,7 +110,7 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
PendingConnectionError::Aborted => None,
}
}
}
23 changes: 7 additions & 16 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},

/// A connection has been established.
Expand Down Expand Up @@ -350,14 +351,15 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
new_endpoint: new,
}
}
task::Event::Closed { id, error } => {
task::Event::Closed { id, error, handler } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) => Event::ConnectionClosed {
id,
connected,
error,
handler,
},
TaskState::Pending => unreachable!(
"`Event::Closed` implies (2) occurred on that task and thus (3)."
Expand Down Expand Up @@ -437,15 +439,15 @@ impl<'a, I> EstablishedEntry<'a, I> {
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
pub fn start_close(mut self, error: Option<ConnectionLimit>) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self
.task
.get_mut()
.sender
.clone()
.try_send(task::Command::Close)
.try_send(task::Command::Close(error))
{
Ok(()) => {}
Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
Expand All @@ -460,17 +462,6 @@ impl<'a, I> EstablishedEntry<'a, I> {
}
}

/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()"),
}
}

/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
Expand Down
55 changes: 43 additions & 12 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Multiaddr,
Expand All @@ -43,7 +43,7 @@ pub enum Command<T> {
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close,
Close(Option<ConnectionLimit>),
}

/// Events that a task can emit to its manager.
Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum Event<H: IntoConnectionHandler, TE> {
Closed {
id: TaskId,
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},
}

Expand Down Expand Up @@ -159,7 +160,11 @@ where
},

/// The connection is closing (active close).
Closing(Close<M>),
Closing {
closing_muxer: Close<M>,
handler: H::Handler,
error: Option<ConnectionLimit>,
},

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -204,7 +209,16 @@ where
Poll::Pending => {}
Poll::Ready(None) => {
// The manager has dropped the task; abort.
return Poll::Ready(());
// Don't accept any further commands and terminate the
// task with a final event.
this.commands.get_mut().close();
let event = Event::Failed {
id,
handler,
error: PendingConnectionError::Aborted,
};
this.state = State::Terminating(event);
continue 'poll;
}
Poll::Ready(Some(_)) => {
panic!("Task received command while the connection is pending.")
Expand Down Expand Up @@ -243,15 +257,20 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.inject_event(event)
}
Poll::Ready(Some(Command::Close)) => {
Poll::Ready(Some(Command::Close(error))) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
error,
};
continue 'poll;
}
Poll::Ready(None) => {
// The manager has dropped the task or disappeared; abort.
// The manager has disappeared; abort.
return Poll::Ready(());
}
}
Expand Down Expand Up @@ -306,36 +325,48 @@ where
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
let (handler, _closing_muxer) = connection.close();
// Terminate the task with the error, dropping the connection.
let event = Event::Closed {
id,
error: Some(error),
handler,
};
this.state = State::Terminating(event);
}
}
}
}

State::Closing(mut closing) => {
State::Closing {
handler,
error,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
error: error.map(|limit| ConnectionError::ConnectionLimit(limit)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
this.state = State::Closing {
handler,
error,
closing_muxer,
};
return Poll::Pending;
}
}
Expand Down
Loading