Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Periodically call Peerset::alloc_slots on all sets (#9025)
Browse files Browse the repository at this point in the history
* Periodically call alloc_slots on all slots

* Add test
  • Loading branch information
tomaka authored Jun 7, 2021
1 parent 1fa8cf7 commit 5d89967
Showing 1 changed file with 54 additions and 1 deletion.
55 changes: 54 additions & 1 deletion client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures::prelude::*;
use log::{debug, error, trace};
use serde_json::json;
use std::{collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
use wasm_timer::{Delay, Instant};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};

pub use libp2p::PeerId;
Expand Down Expand Up @@ -252,6 +252,9 @@ pub struct Peerset {
created: Instant,
/// Last time when we updated the reputations of connected nodes.
latest_time_update: Instant,
/// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per
/// second, to match the period of the reputation updates.
next_periodic_alloc_slots: Delay,
}

impl Peerset {
Expand Down Expand Up @@ -279,6 +282,7 @@ impl Peerset {
message_queue: VecDeque::new(),
created: now,
latest_time_update: now,
next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
}
};

Expand Down Expand Up @@ -699,6 +703,14 @@ impl Stream for Peerset {
return Poll::Ready(Some(message));
}

if let Poll::Ready(_) = Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx) {
self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));

for set_index in 0..self.data.num_sets() {
self.alloc_slots(SetId(set_index));
}
}

let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(event)) => event,
Expand Down Expand Up @@ -907,4 +919,45 @@ mod tests {

futures::executor::block_on(fut);
}

#[test]
fn test_relloc_after_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
});

// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));

let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);

// Check that an incoming connection from that node gets refused.
// This is already tested in other tests, but it is done again here because it doesn't
// hurt.
peerset.incoming(SetId::from(0), peer_id.clone(), IncomingIndex(1));
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}

// Wait for the peerset to change its mind and actually connect to it.
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
}

Poll::Ready(())
});

futures::executor::block_on(fut);
}
}

0 comments on commit 5d89967

Please sign in to comment.