diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index eefab81b851da..19260afccb802 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -39,7 +39,7 @@ use futures::prelude::*; use log::{debug, error, trace}; use serde_json::json; use std::{collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration}; -use wasm_timer::Instant; +use wasm_timer::{Delay, Instant}; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; pub use libp2p::PeerId; @@ -252,6 +252,9 @@ pub struct Peerset { created: Instant, /// Last time when we updated the reputations of connected nodes. latest_time_update: Instant, + /// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per + /// second, to match the period of the reputation updates. + next_periodic_alloc_slots: Delay, } impl Peerset { @@ -279,6 +282,7 @@ impl Peerset { message_queue: VecDeque::new(), created: now, latest_time_update: now, + next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)), } }; @@ -699,6 +703,14 @@ impl Stream for Peerset { return Poll::Ready(Some(message)); } + if let Poll::Ready(_) = Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx) { + self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0)); + + for set_index in 0..self.data.num_sets() { + self.alloc_slots(SetId(set_index)); + } + } + let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) { Poll::Pending => return Poll::Pending, Poll::Ready(Some(event)) => event, @@ -907,4 +919,45 @@ mod tests { futures::executor::block_on(fut); } + + #[test] + fn test_relloc_after_banned() { + let (mut peerset, handle) = Peerset::from_config(PeersetConfig { + sets: vec![SetConfig { + in_peers: 25, + out_peers: 25, + bootnodes: vec![], + reserved_nodes: Default::default(), + reserved_only: false, + }], + }); + + // We ban a node by setting its reputation under the threshold. + let peer_id = PeerId::random(); + handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, "")); + + let fut = futures::future::poll_fn(move |cx| { + // We need one polling for the message to be processed. + assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending); + + // Check that an incoming connection from that node gets refused. + // This is already tested in other tests, but it is done again here because it doesn't + // hurt. + peerset.incoming(SetId::from(0), peer_id.clone(), IncomingIndex(1)); + if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) { + assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1))); + } else { + panic!() + } + + // Wait for the peerset to change its mind and actually connect to it. + while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) { + assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id }); + } + + Poll::Ready(()) + }); + + futures::executor::block_on(fut); + } }