Skip to content

Commit

Permalink
fix: remove external lock reference of network::peer_registry
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Nov 22, 2018
1 parent bc99452 commit e088fd0
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 148 deletions.
37 changes: 13 additions & 24 deletions network/src/ckb_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
protocol_id: ProtocolId,
data: Vec<u8>,
) -> Result<(), Error> {
let peers_registry = self.network.peers_registry().read();
if let Some(peer_id) = peers_registry.get_peer_id(peer_index) {
self.network.send(peer_id, protocol_id, data.into())
if let Some(peer_id) = self.network.get_peer_id(peer_index) {
self.network.send(&peer_id, protocol_id, data.into())
} else {
Err(ErrorKind::PeerNotFound.into())
}
Expand All @@ -88,23 +87,15 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
}
// ban peer
fn ban_peer(&self, peer_index: PeerIndex, timeout: Duration) {
let mut peers_registry = self.network.peers_registry().write();
if let Some(peer_id) = peers_registry
.get_peer_id(peer_index)
.map(|peer_id| peer_id.to_owned())
{
peers_registry.ban_peer(peer_id, timeout)
if let Some(peer_id) = self.network.get_peer_id(peer_index) {
self.network.ban_peer(peer_id, timeout)
}
}
// disconnect from peer
fn disconnect(&self, peer_index: PeerIndex) {
debug!(target: "network", "disconnect peer {}", peer_index);
let mut peers_registry = self.network.peers_registry().write();
if let Some(peer_id) = peers_registry
.get_peer_id(peer_index)
.map(|peer_id| peer_id.to_owned())
{
peers_registry.drop_peer(&peer_id)
if let Some(peer_id) = self.network.get_peer_id(peer_index) {
self.network.drop_peer(&peer_id)
}
}
fn register_timer(&self, token: TimerToken, duration: Duration) -> Result<(), Error> {
Expand All @@ -124,21 +115,21 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
Ok(())
}
fn session_info(&self, peer_index: PeerIndex) -> Option<SessionInfo> {
let peers_registry = self.network.peers_registry().read();
if let Some(session) = peers_registry
if let Some(session) = self
.network
.get_peer_id(peer_index)
.map(|peer_id| self.network.session_info(peer_id, self.protocol_id))
.map(|peer_id| self.network.session_info(&peer_id, self.protocol_id))
{
session
} else {
None
}
}
fn protocol_version(&self, peer_index: PeerIndex, protocol_id: ProtocolId) -> Option<u8> {
let peers_registry = self.network.peers_registry().read();
if let Some(protocol_version) = peers_registry
if let Some(protocol_version) = self
.network
.get_peer_id(peer_index)
.map(|peer_id| self.network.peer_protocol_version(peer_id, protocol_id))
.map(|peer_id| self.network.peer_protocol_version(&peer_id, protocol_id))
{
protocol_version
} else {
Expand All @@ -151,9 +142,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
}

fn connected_peers(&self) -> Vec<PeerIndex> {
let peers_registry = self.network.peers_registry().read();
let iter = peers_registry.connected_peers_indexes();
iter.collect::<Vec<_>>()
self.network.peers_indexes()
}
}

Expand Down
20 changes: 8 additions & 12 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,13 @@ impl CKBService {
return Box::new(future::ok(())) as Box<_>;
}

let peer_index = {
let peers_registry = network.peers_registry().read();
match peers_registry.get(&peer_id) {
Some(peer) => peer.peer_index.unwrap(),
None => {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("can't find peer {:?}", peer_id),
)))
}
let peer_index = match network.get_peer_index(&peer_id) {
Some(peer_index) => peer_index,
None => {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("can't find peer {:?}", peer_id),
)))
}
};

Expand Down Expand Up @@ -118,8 +115,7 @@ impl CKBService {
)),
peer_index,
);
let mut peers_registry = network.peers_registry().write();
peers_registry.drop_peer(&peer_id);
network.drop_peer(&peer_id);
val
}
})
Expand Down
24 changes: 7 additions & 17 deletions network/src/discovery_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ use transport::TransportOutput;

pub(crate) struct DiscoveryService {
timeout: Duration,
discovery_interval: Duration,
pub(crate) kad_system: Arc<kad::KadSystem>,
default_response_neighbour_count: usize,
pub(crate) kad_upgrade: kad::KadConnecConfig,
kad_manage: Arc<Mutex<KadManage>>,
}

Expand Down Expand Up @@ -83,16 +81,13 @@ impl DiscoveryService {
pub fn new(
timeout: Duration,
default_response_neighbour_count: usize,
discovery_interval: Duration,
kad_manage: Arc<Mutex<KadManage>>,
kad_system: Arc<kad::KadSystem>,
) -> Self {
DiscoveryService {
timeout,
kad_system,
kad_upgrade: kad::KadConnecConfig::new(),
default_response_neighbour_count,
discovery_interval,
kad_manage,
}
}
Expand All @@ -101,7 +96,7 @@ impl DiscoveryService {
&self,
network: Arc<Network>,
peer_id: PeerId,
client_addr: Multiaddr,
_client_addr: Multiaddr,
kad_connection_controller: kad::KadConnecController,
_endpoint: Endpoint,
kademlia_stream: Box<Stream<Item = kad::KadIncomingRequest, Error = IoError> + Send>,
Expand Down Expand Up @@ -299,7 +294,6 @@ where
Vec<Box<Stream<Item = kad::KadQueryEvent<Vec<PeerId>>, Error = IoError> + Send>>,
kad_system: Arc<kad::KadSystem>,
kad_manage: Arc<Mutex<KadManage>>,
kad_upgrade: kad::KadConnecConfig,
}

impl<SwarmTran, Tran, TranOut, T> DiscoveryQueryService<SwarmTran, Tran, TranOut, T>
Expand Down Expand Up @@ -331,7 +325,6 @@ where
discovery_interval: Duration,
kad_system: Arc<kad::KadSystem>,
kad_manage: Arc<Mutex<KadManage>>,
kad_upgrade: kad::KadConnecConfig,
) -> Self {
let (kad_controller_request_sender, kad_controller_request_receiver) = mpsc::unbounded();
DiscoveryQueryService {
Expand All @@ -345,7 +338,6 @@ where
kad_query_events: Vec::with_capacity(10),
kad_system,
kad_manage,
kad_upgrade,
kad_controller_request_sender,
kad_controller_request_receiver,
}
Expand All @@ -362,17 +354,16 @@ where
let query = self.kad_system.find_node(random_peer_id, {
let kad_manage = Arc::clone(&self.kad_manage);
let kad_controller_request_sender = self.kad_controller_request_sender.clone();
let kad_upgrade = self.kad_upgrade.clone();
move |peer_id| {
let (tx, rx) = oneshot::channel();
let mut kad_manage = kad_manage.lock();
kad_manage
.kad_pending_dials
.entry(peer_id.clone())
.or_insert(Vec::new())
.or_insert_with(Vec::new)
.push(tx);
debug!(target: "discovery", "find node from {:?} pending: {}", peer_id, kad_manage.kad_pending_dials.get(&peer_id).unwrap().len());
kad_controller_request_sender.unbounded_send(peer_id.clone());
debug!(target: "discovery", "find node from {:?} pending: {}", peer_id, kad_manage.kad_pending_dials[&peer_id].len());
kad_controller_request_sender.unbounded_send(peer_id.clone()).expect("send kad controller request");
rx.map_err(|err| {
IoError::new(
IoErrorKind::Other,
Expand Down Expand Up @@ -521,16 +512,14 @@ pub(crate) struct KadManage {
connected_kad_peers: FnvHashMap<PeerId, UniqueConnec<kad::KadConnecController>>,
kad_pending_dials: FnvHashMap<PeerId, Vec<oneshot::Sender<kad::KadConnecController>>>,
kad_upgrade: kad::KadConnecConfig,
network: Arc<Network>,
pub(crate) to_notify: Option<task::Task>,
}

impl KadManage {
pub fn new(network: Arc<Network>, kad_upgrade: kad::KadConnecConfig) -> Self {
pub fn new(_network: Arc<Network>, kad_upgrade: kad::KadConnecConfig) -> Self {
KadManage {
connected_kad_peers: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
kad_pending_dials: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
network,
kad_upgrade,
to_notify: None,
}
Expand Down Expand Up @@ -608,10 +597,11 @@ impl KadManage {
}
});

let dial_future = kad_connection.dial(swarm_controller, addr, transport);
let _ = kad_connection.dial(swarm_controller, addr, transport);
Ok(kad_connection)
}

#[allow(dead_code)]
fn drop_connection(&mut self, peer_id: &PeerId) {
debug!(target: "discovery","disconnect kad connection from {:?}", peer_id);
self.connected_kad_peers.remove(peer_id);
Expand Down
45 changes: 22 additions & 23 deletions network/src/identify_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ impl IdentifyService {
trace!("process identify for peer_id {:?} with {:?}", peer_id, info);
// set identify info to peer
{
let mut peers_registry = network.peers_registry().write();
match peers_registry.get_mut(&peer_id) {
Some(peer) => {
peer.identify_info = Some(PeerIdentifyInfo {
client_version: info.agent_version.clone(),
protocol_version: info.protocol_version.clone(),
supported_protocols: info.protocols.clone(),
count_of_known_listen_addrs: info.listen_addrs.len(),
})
}
None => error!(
let identify_info = PeerIdentifyInfo {
client_version: info.agent_version.clone(),
protocol_version: info.protocol_version.clone(),
supported_protocols: info.protocols.clone(),
count_of_known_listen_addrs: info.listen_addrs.len(),
};
if network
.set_peer_identify_info(&peer_id, identify_info)
.is_err()
{
error!(
target: "network",
"can't find peer_id {:?} during process identify info",
peer_id
),
)
}
}

Expand Down Expand Up @@ -194,29 +194,28 @@ where
let _identify_timeout = self.identify_timeout;
let network = Arc::clone(&network);
move |_| {
let peers_registry = network.peers_registry().read();
for (peer_id, peer) in peers_registry.peers_iter() {
if let Some(ref identify_info) = peer.identify_info {
for peer_id in network.peers() {
if let Some(ref identify_info) = network.get_peer_identify_info(&peer_id) {
if identify_info.count_of_known_listen_addrs > 0 {
continue;
}
}
trace!(
target: "network",
"request identify to peer {:?} {:?}",
peer_id,
peer.remote_addresses
);
// TODO should we try all addresses?
if let Some(addr) = peer.remote_addresses.get(0) {
if let Some(addr) = network.get_peer_remote_addresses(&peer_id).get(0) {
trace!(
target: "network",
"request identify to peer {:?} {:?}",
peer_id,
addr
);
// dial identify
let _ = swarm_controller.dial(addr.clone(), transport.clone());
} else {
error!(
target: "network",
"error when prepare identify : can't find addresses for peer {:?}",
peer_id
);
);
}
}
Ok(())
Expand Down
Loading

0 comments on commit e088fd0

Please sign in to comment.