Skip to content

Commit

Permalink
fix: add tests/peers_registry
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Dec 18, 2018
1 parent 6790087 commit 9616a18
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 41 deletions.
2 changes: 2 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod peers_registry;
mod ping_service;
mod protocol;
mod protocol_service;
#[cfg(test)]
mod tests;
mod timer_service;
mod transport;

Expand Down
13 changes: 10 additions & 3 deletions network/src/memory_peer_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::peer_store::{Behaviour, PeerStore, Score, Status};
use crate::peer_store::{Behaviour, PeerStore, ReportResult, Score, ScoringSchema, Status};
use crate::PeerId;
use fnv::FnvHashMap;
use libp2p::core::Multiaddr;
use log::trace;
use peer_store::{Behaviour, PeerStore, ReportResult, Score, ScoringSchema, Status};
use log::{debug, trace};
use std::time::{Duration, Instant};

#[derive(Debug)]
Expand Down Expand Up @@ -49,6 +48,14 @@ impl MemoryPeerStore {
}

impl PeerStore for MemoryPeerStore {
fn new_connected_peer(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_discovered_address(peer_id, address).unwrap();
}

fn scoring_schema(&self) -> &ScoringSchema {
&self.schema
}

fn add_discovered_address(&mut self, peer_id: &PeerId, address: Multiaddr) -> Result<(), ()> {
self.add_discovered_addresses(peer_id, vec![address])
.map(|_| ())
Expand Down
6 changes: 3 additions & 3 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::discovery_service::{DiscoveryQueryService, DiscoveryService, KadManag
use crate::identify_service::IdentifyService;
use crate::memory_peer_store::MemoryPeerStore;
use crate::outbound_peer_service::OutboundPeerService;
use crate::peer_store::{Behaviour, PeerStore};
use crate::peer_store::PeerStore;
use crate::peers_registry::{ConnectionStatus, PeerConnection, PeerIdentifyInfo, PeersRegistry};
use crate::ping_service::PingService;
use crate::protocol::Protocol;
Expand Down Expand Up @@ -255,7 +255,7 @@ impl Network {
.peer_store()
.write()
.add_discovered_address(peer_id, connected_addr);
let mut peer = peers_registry.get_mut(&peer_id).unwrap();
let peer = peers_registry.get_mut(&peer_id).unwrap();
Ok(self.ckb_protocol_connec(peer, protocol_id))
}
Err(err) => Err(err),
Expand All @@ -276,7 +276,7 @@ impl Network {
.peer_store()
.write()
.add_discovered_address(peer_id, connected_addr);
let mut peer = peers_registry.get_mut(&peer_id).unwrap();
let peer = peers_registry.get_mut(&peer_id).unwrap();
Ok(self.ckb_protocol_connec(peer, protocol_id))
}
Err(err) => Err(err),
Expand Down
2 changes: 0 additions & 2 deletions network/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use crate::ckb_protocol_handler::CKBProtocolHandler;
use crate::ckb_protocol_handler::{CKBProtocolContext, DefaultCKBProtocolContext};
use crate::network::Network;
use crate::peer_store::PeerStore;
use crate::peers_registry::PeerConnection;
use crate::NetworkConfig;
use crate::{Error, ErrorKind, ProtocolId};
use ckb_util::RwLock;
use futures::future::Future;
use futures::sync::oneshot;
use libp2p::core::PeerId;
use log::{debug, info};
use std::boxed::Box;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
Expand Down
12 changes: 9 additions & 3 deletions network/src/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ impl Default for ScoringSchema {
}

pub trait PeerStore: Send + Sync {
// update peer addresses, return numbers of new inserted line
// return Err if peer not exists
// initial or update peer_info in peer_store
fn new_connected_peer(&mut self, peer_id: &PeerId, address: Multiaddr);
// add peer discovered addresses, return numbers of new inserted line, return Err if peer not exists
fn add_discovered_address(&mut self, peer_id: &PeerId, address: Multiaddr) -> Result<(), ()>;
fn add_discovered_addresses(
&mut self,
peer_id: &PeerId,
addresses: Vec<Multiaddr>,
address: Vec<Multiaddr>,
) -> Result<usize, ()>;
fn report(&mut self, peer_id: &PeerId, behaviour: Behaviour) -> ReportResult;
fn update_status(&mut self, peer_id: &PeerId, status: Status);
Expand All @@ -107,4 +108,9 @@ pub trait PeerStore: Send + Sync {
fn peers_to_attempt<'a>(&'a self) -> Box<Iterator<Item = (&'a PeerId, &'a Multiaddr)> + 'a>;
fn ban_peer(&mut self, peer_id: PeerId, timeout: Duration);
fn is_banned(&self, peer_id: &PeerId) -> bool;
fn scoring_schema(&self) -> &ScoringSchema;
fn peer_score_or_default(&self, peer_id: &PeerId) -> Score {
self.peer_score(peer_id)
.unwrap_or_else(|| self.scoring_schema().peer_init_score())
}
}
82 changes: 52 additions & 30 deletions network/src/peers_registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::network_group::{Group, NetworkGroup};
use crate::peer_store::PeerStore;
use crate::peer_store::{PeerStore, Score};
use crate::{Error, ErrorKind, PeerId, PeerIndex, ProtocolId};
use bytes::Bytes;
use ckb_util::RwLock;
Expand Down Expand Up @@ -154,17 +154,18 @@ pub(crate) struct PeersRegistry {

fn find_most_peers_in_same_network_group<'a>(
peers: impl Iterator<Item = (&'a PeerId, &'a PeerConnection)>,
) -> Vec<(&'a PeerId, &'a PeerConnection)> {
let mut groups: FnvHashMap<Group, Vec<(&'a PeerId, &'a PeerConnection)>> =
) -> Vec<&'a PeerId> {
let mut groups: FnvHashMap<Group, Vec<&'a PeerId>> =
FnvHashMap::with_capacity_and_hasher(16, Default::default());
let largest_group_len = 0;
let mut largest_group_len = 0;
let mut largest_group: Group = Default::default();

for (peer_id, peer) in peers {
let group_name = peer.network_group();
let mut group = groups.entry(group_name.clone()).or_insert_with(Vec::new);
group.push((peer_id, peer));
let group = groups.entry(group_name.clone()).or_insert_with(Vec::new);
group.push(peer_id);
if group.len() > largest_group_len {
largest_group_len = group.len();
largest_group = group_name;
}
}
Expand Down Expand Up @@ -200,28 +201,35 @@ impl PeersRegistry {
self.peers.get_peer_id(peer_index)
}

pub fn is_reserved(&self, peer_id: &PeerId) -> bool {
self.reserved_peers.contains(&peer_id)
}

pub fn accept_inbound_peer(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
if self.peers.get(&peer_id).is_some() {
return Ok(());
}
let is_reserved = self.reserved_peers.contains(&peer_id);
if !is_reserved {
if !self.is_reserved(&peer_id) {
if self.reserved_only {
return Err(ErrorKind::InvalidNewPeer(format!(
"We are in reserved_only mode, rejected non-reserved peer {:?}",
peer_id
))
.into());
}
if self.peer_store.read().is_banned(&peer_id) {
return Err(
ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(),
);
}
let candidate_score = {
let peer_store = self.peer_store.read();
if peer_store.is_banned(&peer_id) {
return Err(
ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(),
);
}
peer_store.peer_score_or_default(&peer_id)
};
let connection_status = self.connection_status();
// check peers connection limitation
if connection_status.unreserved_inbound >= self.max_inbound
&& !self.try_evict_inbound_peer()
&& !self.try_evict_inbound_peer(candidate_score)
{
return Err(ErrorKind::InvalidNewPeer(format!(
"reach max inbound peers limitation, reject peer {:?}",
Expand All @@ -234,15 +242,18 @@ impl PeersRegistry {
Ok(())
}

fn try_evict_inbound_peer(&mut self) -> bool {
let peer_id: PeerId = {
let inbound_peers = self.peers.iter().filter(|(_, peer)| peer.is_inbound());
fn try_evict_inbound_peer(&mut self, candidate_score: Score) -> bool {
let (peer_id, score) = {
let inbound_peers = self
.peers
.iter()
.filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id));
let candidate_peers = find_most_peers_in_same_network_group(inbound_peers);
let peer_store = self.peer_store.read();

let mut lowest_score = std::i32::MAX;
let mut low_score_peers = Vec::new();
for (peer_id, _peer) in candidate_peers {
for peer_id in candidate_peers {
if let Some(score) = peer_store.peer_score(peer_id) {
if score > lowest_score {
continue;
Expand All @@ -251,7 +262,6 @@ impl PeersRegistry {
lowest_score = score;
low_score_peers.clear();
}

low_score_peers.push(peer_id);
}
}
Expand All @@ -260,23 +270,29 @@ impl PeersRegistry {
return false;
}
let mut rng = thread_rng();
low_score_peers[..]
.choose(&mut rng)
.unwrap()
.to_owned()
.to_owned()
(
low_score_peers[..]
.choose(&mut rng)
.unwrap()
.to_owned()
.to_owned(),
lowest_score,
)
};
debug!("evict inbound peer {:?}", peer_id);
self.drop_peer(&peer_id);
true
if score < candidate_score {
debug!("evict inbound peer {:?}", peer_id);
self.drop_peer(&peer_id);
true
} else {
false
}
}

pub fn try_outbound_peer(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
if self.peers.get(&peer_id).is_some() {
return Ok(());
}
let is_reserved = self.reserved_peers.contains(&peer_id);
if !is_reserved {
if !self.is_reserved(&peer_id) {
if self.reserved_only {
return Err(ErrorKind::InvalidNewPeer(format!(
"We are in reserved_only mode, rejected non-reserved peer {:?}",
Expand Down Expand Up @@ -312,6 +328,9 @@ impl PeersRegistry {
connected_addr: Multiaddr,
endpoint: Endpoint,
) -> PeerIndex {
self.peer_store
.write()
.new_connected_peer(&peer_id, connected_addr.clone());
let peer = PeerConnection::new(connected_addr, endpoint);
let peer_index = self.peers.or_insert(peer_id.clone(), peer);
debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index, peer_id);
Expand All @@ -338,8 +357,11 @@ impl PeersRegistry {
let mut total: u32 = 0;
let mut unreserved_inbound: u32 = 0;
let mut unreserved_outbound: u32 = 0;
for (_, peer_connection) in self.peers.iter() {
for (peer_id, peer_connection) in self.peers.iter() {
total += 1;
if self.is_reserved(peer_id) {
continue;
}
if peer_connection.is_outbound() {
unreserved_outbound += 1;
} else {
Expand Down
2 changes: 2 additions & 0 deletions network/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(test)]
mod peers_registry;
Loading

0 comments on commit 9616a18

Please sign in to comment.