Skip to content

Commit

Permalink
swarm/: Inject handler on connection error and closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 12, 2021
1 parent a332591 commit 46904a6
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
9 changes: 9 additions & 0 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ pub enum DialAttemptsRemaining<THandler> {
None(THandler),
}

impl<THandler> From<&DialAttemptsRemaining<THandler>> for u32 {
fn from(attempts_remaining: &DialAttemptsRemaining<THandler>) -> Self {
match attempts_remaining {
DialAttemptsRemaining::Some(attempts) => *attempts,
DialAttemptsRemaining::None(_) => 0,
}
}
}

impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug
for NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler>
where
Expand Down
4 changes: 2 additions & 2 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub trait NetworkBehaviour: Send + 'static {
/// A call to this method is always paired with an earlier call to
/// `inject_connection_established` with the same peer ID, connection ID and
/// endpoint.
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint, _: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler) {}

/// Informs the behaviour that the [`ConnectedPoint`] of an existing connection has changed.
fn inject_address_change(
Expand Down Expand Up @@ -153,7 +153,7 @@ pub trait NetworkBehaviour: Send + 'static {
///
/// The `peer_id` is guaranteed to be in a disconnected state. In other words,
/// `inject_connected` has not been called, or `inject_disconnected` has been called since then.
fn inject_dial_failure(&mut self, _peer_id: &PeerId) {}
fn inject_dial_failure(&mut self, _peer_id: &PeerId, _handler: Self::ProtocolsHandler) {}

/// Indicates to the behaviour that a new listener was created.
fn inject_new_listener(&mut self, _id: ListenerId) {}
Expand Down
38 changes: 27 additions & 11 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ use libp2p_core::{
},
muxing::StreamMuxerBox,
network::{
self, peer::ConnectedPeer, ConnectionLimits, Network, NetworkConfig, NetworkEvent,
NetworkInfo,
self, peer::ConnectedPeer, ConnectionLimits, DialAttemptsRemaining, Network, NetworkConfig,
NetworkEvent, NetworkInfo,
},
transport::{self, TransportError},
upgrade::ProtocolName,
Expand Down Expand Up @@ -342,7 +342,8 @@ where
/// Initiates a new dialing attempt to the given peer.
pub fn dial(&mut self, peer_id: &PeerId) -> Result<(), DialError> {
if self.banned_peers.contains(peer_id) {
self.behaviour.inject_dial_failure(peer_id);
// TODO: Needed?
// self.behaviour.inject_dial_failure(peer_id);
return Err(DialError::Banned);
}

Expand Down Expand Up @@ -374,7 +375,8 @@ where
peer_id,
error
);
self.behaviour.inject_dial_failure(&peer_id);
// TODO: Needed?
// self.behaviour.inject_dial_failure(&peer_id);
}

result
Expand Down Expand Up @@ -568,6 +570,7 @@ where
connected,
error,
num_established,
handler,
}) => {
if let Some(error) = error.as_ref() {
log::debug!("Connection {:?} closed: {:?}", connected, error);
Expand All @@ -576,8 +579,12 @@ where
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
this.behaviour
.inject_connection_closed(&peer_id, &id, &endpoint);
this.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler.into_protocol_handler(),
);
if num_established == 0 {
this.behaviour.inject_disconnected(&peer_id);
}
Expand Down Expand Up @@ -668,7 +675,9 @@ where
local_addr,
send_back_addr,
error,
handler: _,
}) => {
// TODO: Should handler not be injected into behaviour?
log::debug!("Incoming connection failed: {:?}", error);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
Expand All @@ -684,17 +693,21 @@ where
}) => {
log::debug!(
"Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
peer_id, multiaddr, error, attempts_remaining);
// TODO: Can we do better on conversion?
peer_id, multiaddr, error, Into::<u32>::into(&attempts_remaining));
this.behaviour
.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if attempts_remaining == 0 {
this.behaviour.inject_dial_failure(&peer_id);
let attempts_remaining_num = (&attempts_remaining).into();
if let DialAttemptsRemaining::None(handler) = attempts_remaining {
this.behaviour
.inject_dial_failure(&peer_id, handler.into_protocol_handler());
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id,
address: multiaddr,
error,
attempts_remaining,
// TODO: Can we do better?
attempts_remaining: attempts_remaining_num,
});
}
Poll::Ready(NetworkEvent::UnknownPeerDialError {
Expand Down Expand Up @@ -766,7 +779,10 @@ where
}
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
this.behaviour.inject_dial_failure(
&peer_id,
todo!("Have DialPeer contain handler which can then be returned."),
);
} else {
let condition_matched = match condition {
DialPeerCondition::Disconnected => {
Expand Down
13 changes: 13 additions & 0 deletions swarm/src/protocols_handler/node_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ where
self.substream_upgrade_protocol_override = version;
self
}

// TODO: Rethink this method.
pub(crate) fn into_protocol_handler(self) -> TIntoProtoHandler {
self.handler
}
}

impl<TIntoProtoHandler, TProtoHandler> IntoConnectionHandler
Expand Down Expand Up @@ -130,6 +135,14 @@ where
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

impl<TProtoHandler: ProtocolsHandler> NodeHandlerWrapper<TProtoHandler> {
// TODO: Should this be a From impl?
// TODO: Find better name.
pub fn into_protocol_handler(self) -> TProtoHandler {
self.handler
}
}

struct SubstreamUpgrade<UserData, Upgrade> {
user_data: Option<UserData>,
timeout: Delay,
Expand Down
9 changes: 6 additions & 3 deletions swarm/src/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ where
peer_id: &PeerId,
connection: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_connection_closed(peer_id, connection, endpoint)
// TODO: Unwrap safe here?
inner.inject_connection_closed(peer_id, connection, endpoint, handler.inner.unwrap())
}
}

Expand Down Expand Up @@ -153,9 +155,10 @@ where
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(&mut self, peer_id: &PeerId, handler: Self::ProtocolsHandler) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_dial_failure(peer_id)
// TODO: Unwrap safe here?
inner.inject_dial_failure(peer_id, handler.inner.unwrap())
}
}

Expand Down

0 comments on commit 46904a6

Please sign in to comment.