Skip to content

Commit

Permalink
swarm/src/lib: Extract PoolEvent handling into new method
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed May 2, 2022
1 parent 3dd27e1 commit 9f0ca95
Showing 1 changed file with 171 additions and 157 deletions.
328 changes: 171 additions & 157 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ where
};
match dial {
Ok(fut) => fut
.map(|r| (address, r.map_err(TransportError::Other)))
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(),
}
Expand All @@ -538,7 +538,7 @@ where
Err((connection_limit, handler)) => {
let error = DialError::ConnectionLimit(connection_limit);
self.behaviour.inject_dial_failure(None, handler, &error);
Err(error)
return Err(error);
}
}
}
Expand Down Expand Up @@ -664,6 +664,171 @@ where
&mut self.behaviour
}

fn handle_pool_event(
&mut self,
event: PoolEvent<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
PoolEvent::ConnectionEstablished {
peer_id,
id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
} => {
if self.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
self.banned_peer_connections.insert(id);
self.pool.disconnect(peer_id);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(&conn_id))
.count();

log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let endpoint = endpoint.clone();
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
self.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
} => {
let error = error.into();

self.behaviour.inject_dial_failure(peer, handler, &error);

if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
} else {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
}

return Some(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
error,
});
}
PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
} => {
log::debug!("Incoming connection failed: {:?}", error);
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
return Some(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
}
PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
} => {
if let Some(error) = error.as_ref() {
log::debug!(
"Connection closed with error {:?}: {:?}; Total (peer): {}.",
error,
connected,
remaining_established_connection_ids.len()
);
} else {
log::debug!(
"Connection closed: {:?}; Total (peer): {}.",
connected,
remaining_established_connection_ids.len()
);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !self.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(&conn_id))
.count();
self.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler,
remaining_non_banned,
);
}
return Some(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
}
PoolEvent::ConnectionEvent { peer_id, id, event } => {
if self.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
self.behaviour.inject_event(peer_id, id, event);
}
}
PoolEvent::AddressChange {
peer_id,
id,
new_endpoint,
old_endpoint,
} => {
if !self.banned_peer_connections.contains(&id) {
self.behaviour.inject_address_change(
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
}
}
}

None
}

/// Internal function used by everything event-related.
///
/// Polls the `Swarm` for the next event.
Expand Down Expand Up @@ -777,161 +942,10 @@ where

// Poll the known peers.
match this.pool.poll(cx) {
Poll::Pending => {
connections_not_ready = true;
}
Poll::Ready(PoolEvent::ConnectionEstablished {
id,
peer_id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
}) => {
if this.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
this.banned_peer_connections.insert(id);
this.pool.disconnect(peer_id);
return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
.count();

log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
this.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
}) => {
let error = error.into();

this.behaviour.inject_dial_failure(peer, handler, &error);

if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
} else {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
}

return Poll::Ready(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
error,
});
}
Poll::Ready(PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
}) => {
log::debug!("Incoming connection failed: {:?}", error);
this.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
}
Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
}) => {
if let Some(error) = error.as_ref() {
log::debug!(
"Connection closed with error {:?}: {:?}; Total (peer): {}.",
error,
connected,
remaining_established_connection_ids.len()
);
} else {
log::debug!(
"Connection closed: {:?}; Total (peer): {}.",
connected,
remaining_established_connection_ids.len()
);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !this.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
.count();
this.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler,
remaining_non_banned,
);
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
}
Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => {
if this.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
this.behaviour.inject_event(peer_id, id, event);
}
}
Poll::Ready(PoolEvent::AddressChange {
id,
peer_id,
new_endpoint,
old_endpoint,
}) => {
if !this.banned_peer_connections.contains(&id) {
this.behaviour.inject_address_change(
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
Poll::Pending => connections_not_ready = true,
Poll::Ready(pool_event) => {
if let Some(swarm_event) = this.handle_pool_event(pool_event) {
return Poll::Ready(swarm_event);
}
}
};
Expand Down

0 comments on commit 9f0ca95

Please sign in to comment.