Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Periodically call Peerset::alloc_slots on all sets #9025

Merged
3 commits merged into from
Jun 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures::prelude::*;
use log::{debug, error, trace};
use serde_json::json;
use std::{collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
use wasm_timer::{Delay, Instant};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};

pub use libp2p::PeerId;
Expand Down Expand Up @@ -252,6 +252,9 @@ pub struct Peerset {
created: Instant,
/// Last time when we updated the reputations of connected nodes.
latest_time_update: Instant,
/// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per
/// second, to match the period of the reputation updates.
next_periodic_alloc_slots: Delay,
}

impl Peerset {
Expand Down Expand Up @@ -279,6 +282,7 @@ impl Peerset {
message_queue: VecDeque::new(),
created: now,
latest_time_update: now,
next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
}
};

Expand Down Expand Up @@ -699,6 +703,14 @@ impl Stream for Peerset {
return Poll::Ready(Some(message));
}

if let Poll::Ready(_) = Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx) {
self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));

for set_index in 0..self.data.num_sets() {
self.alloc_slots(SetId(set_index));
}
}

let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(event)) => event,
Expand Down Expand Up @@ -907,4 +919,45 @@ mod tests {

futures::executor::block_on(fut);
}

#[test]
fn test_relloc_after_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
});

// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));

let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);

// Check that an incoming connection from that node gets refused.
// This is already tested in other tests, but it is done again here because it doesn't
// hurt.
peerset.incoming(SetId::from(0), peer_id.clone(), IncomingIndex(1));
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}

// Wait for the peerset to change its mind and actually connect to it.
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
}

Poll::Ready(())
});

futures::executor::block_on(fut);
}
}