Skip to content

Commit

Permalink
fix: Fix network unexpected connections to self (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr authored and doitian committed Nov 27, 2018
1 parent b7d092c commit f4644b8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
17 changes: 15 additions & 2 deletions network/src/discovery_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Instant;
use std::usize;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::{task, Async, Poll};
use tokio::spawn;
use tokio::timer::Interval;
use tokio::timer::Timeout;
use transport::TransportOutput;
Expand Down Expand Up @@ -375,6 +376,14 @@ where

fn handle_kad_controller_request(&self, peer_id: PeerId) {
let mut kad_manage = self.kad_manage.lock();
if &peer_id == self.network.local_peer_id() {
debug!(
target: "discovery",
"ignore kad dial to self"
);
kad_manage.kad_pending_dials.remove(&peer_id);
return;
}
let peer_store = self.network.peer_store().read();
if let Some(addrs) = peer_store.peer_addrs(&peer_id) {
for addr in addrs {
Expand Down Expand Up @@ -648,12 +657,16 @@ impl KadManage {
}
});

let _ = kad_connection.dial(swarm_controller, addr, transport);
let dial_future = kad_connection.dial(swarm_controller, addr, transport);
spawn(dial_future.then(|err| {
debug!(target: "discovery", "dialing result {:?}", err);
future::ok(())
}));
Ok(())
}

fn drop_connection(&mut self, peer_id: &PeerId) {
debug!(target: "discovery","disconnect kad connection from {:?}", peer_id);
debug!(target: "discovery","drop kad connection from {:?}", peer_id);
self.kad_connections.remove(peer_id);
}
}
15 changes: 13 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ pub struct Network {
pub(crate) original_listened_addresses: RwLock<Vec<Multiaddr>>,
pub(crate) ckb_protocols: CKBProtocols<Arc<CKBProtocolHandler>>,
local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
}

impl Network {
pub fn drop_peer(&self, peer_id: &PeerId) {
self.peers_registry.write().drop_peer(&peer_id);
}

pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}

pub(crate) fn add_peer(&self, peer_id: PeerId, peer: PeerConnection) {
let mut peers_registry = self.peers_registry.write();
peers_registry.add_peer(peer_id, peer);
Expand Down Expand Up @@ -314,6 +319,11 @@ impl Network {
St::IncomingUpgrade: Send,
C: Send + 'static,
{
if expected_peer_id == self.local_peer_id() {
debug!(target: "network", "ignore dial to self");
return;
}
debug!(target: "network", "dial to peer {:?} address {:?}", expected_peer_id, addr);
for protocol in &self.ckb_protocols.0 {
self.dial_to_peer_protocol(
transport.clone(),
Expand Down Expand Up @@ -450,6 +460,7 @@ impl Network {
original_listened_addresses: RwLock::new(Vec::new()),
ckb_protocols: CKBProtocols(ckb_protocols),
local_private_key: local_private_key.clone(),
local_peer_id: local_private_key.to_peer_id(),
});
Ok(network)
}
Expand Down Expand Up @@ -481,9 +492,9 @@ impl Network {
let local_peer_id = local_peer_id.clone();
move |(peer_id, stream), _endpoint, remote_addr_fut| {
remote_addr_fut.and_then(move |remote_addr| {
trace!(target: "network", "connection from {:?}", remote_addr);
debug!(target: "network", "connection from {:?} peer_id: {:?}", remote_addr, peer_id);
if peer_id == local_peer_id {
trace!(target: "network", "connect to self, disconnect");
debug!(target: "network", "connect to self, disconnect");
return Err(IoErrorKind::ConnectionRefused.into());
}
let out = TransportOutput {
Expand Down
10 changes: 8 additions & 2 deletions network/src/outgoing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl<T: Send + 'static> ProtocolService<T> for OutgoingService {
}).for_each({
let transport = transport.clone();
let timeout = self.timeout;
let network = Arc::clone(&network);
move |_| {
let connection_status = network.connection_status();
let new_outgoing = (connection_status.max_outgoing
Expand All @@ -88,8 +89,13 @@ impl<T: Send + 'static> ProtocolService<T> for OutgoingService {
for (peer_id, addr) in peer_store
.peers_to_attempt()
.take(new_outgoing)
.map(|(addr, peer_id)| (addr.clone(), peer_id.clone()))
{
.filter_map(|(peer_id, addr)| {
if network.local_peer_id() != peer_id {
Some((peer_id.clone(), addr.clone()))
} else {
None
}
}) {
network.dial_to_peer(
transport.clone(),
&addr,
Expand Down

0 comments on commit f4644b8

Please sign in to comment.