diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 27eff34eb24..6d81b82bda6 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -514,7 +514,7 @@ where }; match dial { Ok(fut) => fut - .map(|r| (address, r.map_err(TransportError::Other))) + .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) .boxed(), Err(err) => futures::future::ready((address, Err(err))).boxed(), } @@ -538,7 +538,7 @@ where Err((connection_limit, handler)) => { let error = DialError::ConnectionLimit(connection_limit); self.behaviour.inject_dial_failure(None, handler, &error); - Err(error) + return Err(error); } } } @@ -664,6 +664,171 @@ where &mut self.behaviour } + fn handle_pool_event( + &mut self, + event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, + ) -> Option>> { + match event { + PoolEvent::ConnectionEstablished { + peer_id, + id, + endpoint, + other_established_connection_ids, + concurrent_dial_errors, + } => { + if self.banned_peers.contains(&peer_id) { + // Mark the connection for the banned peer as banned, thus withholding any + // future events from the connection to the behaviour. + self.banned_peer_connections.insert(id); + self.pool.disconnect(peer_id); + return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); + } else { + let num_established = NonZeroU32::new( + u32::try_from(other_established_connection_ids.len() + 1).unwrap(), + ) + .expect("n + 1 is always non-zero; qed"); + let non_banned_established = other_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(&conn_id)) + .count(); + + log::debug!( + "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", + peer_id, + endpoint, + num_established, + non_banned_established + 1, + ); + let endpoint = endpoint.clone(); + let failed_addresses = concurrent_dial_errors + .as_ref() + .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); + self.behaviour.inject_connection_established( + &peer_id, + &id, + &endpoint, + failed_addresses.as_ref(), + non_banned_established, + ); + return Some(SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + endpoint, + concurrent_dial_errors, + }); + } + } + PoolEvent::PendingOutboundConnectionError { + id: _, + error, + handler, + peer, + } => { + let error = error.into(); + + self.behaviour.inject_dial_failure(peer, handler, &error); + + if let Some(peer) = peer { + log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); + } else { + log::debug!("Connection attempt to unknown peer failed with {:?}", error); + } + + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: peer, + error, + }); + } + PoolEvent::PendingInboundConnectionError { + id: _, + send_back_addr, + local_addr, + error, + handler, + } => { + log::debug!("Incoming connection failed: {:?}", error); + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + return Some(SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + }); + } + PoolEvent::ConnectionClosed { + id, + connected, + error, + remaining_established_connection_ids, + handler, + .. + } => { + if let Some(error) = error.as_ref() { + log::debug!( + "Connection closed with error {:?}: {:?}; Total (peer): {}.", + error, + connected, + remaining_established_connection_ids.len() + ); + } else { + log::debug!( + "Connection closed: {:?}; Total (peer): {}.", + connected, + remaining_established_connection_ids.len() + ); + } + let peer_id = connected.peer_id; + let endpoint = connected.endpoint; + let num_established = + u32::try_from(remaining_established_connection_ids.len()).unwrap(); + let conn_was_reported = !self.banned_peer_connections.remove(&id); + if conn_was_reported { + let remaining_non_banned = remaining_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(&conn_id)) + .count(); + self.behaviour.inject_connection_closed( + &peer_id, + &id, + &endpoint, + handler, + remaining_non_banned, + ); + } + return Some(SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + cause: error, + num_established, + }); + } + PoolEvent::ConnectionEvent { peer_id, id, event } => { + if self.banned_peer_connections.contains(&id) { + log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); + } else { + self.behaviour.inject_event(peer_id, id, event); + } + } + PoolEvent::AddressChange { + peer_id, + id, + new_endpoint, + old_endpoint, + } => { + if !self.banned_peer_connections.contains(&id) { + self.behaviour.inject_address_change( + &peer_id, + &id, + &old_endpoint, + &new_endpoint, + ); + } + } + } + + None + } + /// Internal function used by everything event-related. /// /// Polls the `Swarm` for the next event. @@ -777,161 +942,10 @@ where // Poll the known peers. match this.pool.poll(cx) { - Poll::Pending => { - connections_not_ready = true; - } - Poll::Ready(PoolEvent::ConnectionEstablished { - id, - peer_id, - endpoint, - other_established_connection_ids, - concurrent_dial_errors, - }) => { - if this.banned_peers.contains(&peer_id) { - // Mark the connection for the banned peer as banned, thus withholding any - // future events from the connection to the behaviour. - this.banned_peer_connections.insert(id); - this.pool.disconnect(peer_id); - return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }); - } else { - let num_established = NonZeroU32::new( - u32::try_from(other_established_connection_ids.len() + 1).unwrap(), - ) - .expect("n + 1 is always non-zero; qed"); - let non_banned_established = other_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - - log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", - peer_id, - endpoint, - num_established, - non_banned_established + 1, - ); - let failed_addresses = concurrent_dial_errors - .as_ref() - .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); - this.behaviour.inject_connection_established( - &peer_id, - &id, - &endpoint, - failed_addresses.as_ref(), - non_banned_established, - ); - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - endpoint, - concurrent_dial_errors, - }); - } - } - Poll::Ready(PoolEvent::PendingOutboundConnectionError { - id: _, - error, - handler, - peer, - }) => { - let error = error.into(); - - this.behaviour.inject_dial_failure(peer, handler, &error); - - if let Some(peer) = peer { - log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); - } else { - log::debug!("Connection attempt to unknown peer failed with {:?}", error); - } - - return Poll::Ready(SwarmEvent::OutgoingConnectionError { - peer_id: peer, - error, - }); - } - Poll::Ready(PoolEvent::PendingInboundConnectionError { - id: _, - send_back_addr, - local_addr, - error, - handler, - }) => { - log::debug!("Incoming connection failed: {:?}", error); - this.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); - return Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }); - } - Poll::Ready(PoolEvent::ConnectionClosed { - id, - connected, - error, - remaining_established_connection_ids, - handler, - .. - }) => { - if let Some(error) = error.as_ref() { - log::debug!( - "Connection closed with error {:?}: {:?}; Total (peer): {}.", - error, - connected, - remaining_established_connection_ids.len() - ); - } else { - log::debug!( - "Connection closed: {:?}; Total (peer): {}.", - connected, - remaining_established_connection_ids.len() - ); - } - let peer_id = connected.peer_id; - let endpoint = connected.endpoint; - let num_established = - u32::try_from(remaining_established_connection_ids.len()).unwrap(); - let conn_was_reported = !this.banned_peer_connections.remove(&id); - if conn_was_reported { - let remaining_non_banned = remaining_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - this.behaviour.inject_connection_closed( - &peer_id, - &id, - &endpoint, - handler, - remaining_non_banned, - ); - } - return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - cause: error, - num_established, - }); - } - Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => { - if this.banned_peer_connections.contains(&id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); - } else { - this.behaviour.inject_event(peer_id, id, event); - } - } - Poll::Ready(PoolEvent::AddressChange { - id, - peer_id, - new_endpoint, - old_endpoint, - }) => { - if !this.banned_peer_connections.contains(&id) { - this.behaviour.inject_address_change( - &peer_id, - &id, - &old_endpoint, - &new_endpoint, - ); + Poll::Pending => connections_not_ready = true, + Poll::Ready(pool_event) => { + if let Some(swarm_event) = this.handle_pool_event(pool_event) { + return Poll::Ready(swarm_event); } } };