Skip to content

Commit

Permalink
Merge #2628
Browse files Browse the repository at this point in the history
2628: feat: Fix download scheduler r=quake,doitian a=driftluo

1. disable penalty when download nodes are scarce
2. allow the protection node to be disconnected due to sync judgment

Co-authored-by: driftluo <driftluo@foxmail.com>
  • Loading branch information
bors[bot] and driftluo authored Apr 15, 2021
2 parents 0c9d0a3 + 34055d6 commit f84f391
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
7 changes: 6 additions & 1 deletion sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,15 @@ impl Synchronizer {
};

for peer in disconnect_list.iter() {
// It is not forbidden to evict protected nodes:
// - First of all, this node is not designated by the user for protection,
// but is connected randomly. It does not represent the will of the user
// - Secondly, in the synchronization phase, the nodes with zero download tasks are
// retained, apart from reducing the download efficiency, there is no benefit.
if self
.peers()
.get_flag(*peer)
.map(|flag| flag.is_whitelist || flag.is_protect)
.map(|flag| flag.is_whitelist)
.unwrap_or(false)
{
continue;
Expand Down
2 changes: 2 additions & 0 deletions sync/src/tests/inflight_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ fn inflight_blocks_timeout() {
let faketime_file = faketime::millis_tempfile(0).expect("create faketime file");
faketime::enable(&faketime_file);
let mut inflight_blocks = InflightBlocks::default();
inflight_blocks.protect_num = 0;

assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into()));
Expand Down Expand Up @@ -147,6 +148,7 @@ fn inflight_trace_number_state() {
faketime::enable(&faketime_file);

let mut inflight_blocks = InflightBlocks::default();
inflight_blocks.protect_num = 0;

assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(2.into(), (2, h256!("0x2").pack()).into()));
Expand Down
45 changes: 36 additions & 9 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use ckb_channel::Receiver;
use ckb_constant::sync::{
BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, POW_INTERVAL, RETRY_ASK_TX_TIMEOUT_INCREASE,
SUSPEND_SYNC_TIME,
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
POW_INTERVAL, RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
};
use ckb_error::AnyError;
use ckb_logger::{debug, debug_target, error, trace};
Expand Down Expand Up @@ -561,6 +561,7 @@ pub struct InflightBlocks {
pub(crate) restart_number: BlockNumber,
time_analyzer: TimeAnalyzer,
pub(crate) adjustment: bool,
pub(crate) protect_num: usize,
}

impl Default for InflightBlocks {
Expand All @@ -573,6 +574,7 @@ impl Default for InflightBlocks {
restart_number: 0,
time_analyzer: TimeAnalyzer::default(),
adjustment: true,
protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
}
}
}
Expand Down Expand Up @@ -679,6 +681,16 @@ impl InflightBlocks {
pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
let now = unix_time_as_millis();
let mut disconnect_list = HashSet::new();
// Since statistics are currently disturbed by the processing block time, when the number
// of transactions increases, the node will be accidentally evicted.
//
// Especially on machines with poor CPU performance, the node connection will be frequently
// disconnected due to statistics.
//
// In order to protect the decentralization of the network and ensure the survival of low-performance
// nodes, the penalty mechanism will be closed when the number of download nodes is less than the number of protected nodes
let should_punish = self.download_schedulers.len() > self.protect_num;
let adjustment = self.adjustment;

let trace = &mut self.trace_number;
let download_schedulers = &mut self.download_schedulers;
Expand All @@ -698,7 +710,9 @@ impl InflightBlocks {
if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
if let Some(set) = download_schedulers.get_mut(&value.peer) {
set.hashes.remove(key);
set.punish(2);
if should_punish {
set.punish(2);
}
};
if !trace.is_empty() {
trace.remove(&key);
Expand Down Expand Up @@ -738,13 +752,17 @@ impl InflightBlocks {
if now > 1000 + *time {
if let Some(state) = states.remove(key) {
if let Some(d) = download_schedulers.get_mut(&state.peer) {
d.punish(1);
if should_punish {
d.punish(1);
}
d.hashes.remove(key);
};
} else if let Some(v) = compact_inflight.remove(&key.hash) {
for peer in v {
if let Some(d) = download_schedulers.get_mut(&peer) {
d.punish(1);
if should_punish && adjustment {
for peer in v {
if let Some(d) = download_schedulers.get_mut(&peer) {
d.punish(1);
}
}
}
}
Expand Down Expand Up @@ -813,6 +831,7 @@ impl InflightBlocks {
}

pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
let should_punish = self.download_schedulers.len() > self.protect_num;
let download_schedulers = &mut self.download_schedulers;
let trace = &mut self.trace_number;
let compact = &mut self.compact_reconstruct_inflight;
Expand All @@ -831,8 +850,16 @@ impl InflightBlocks {
match time_analyzer.push_time(elapsed) {
TimeQuantile::MinToFast => set.increase(2),
TimeQuantile::FastToNormal => set.increase(1),
TimeQuantile::NormalToUpper => set.decrease(1),
TimeQuantile::UpperToMax => set.decrease(2),
TimeQuantile::NormalToUpper => {
if should_punish {
set.decrease(1)
}
}
TimeQuantile::UpperToMax => {
if should_punish {
set.decrease(2)
}
}
}
}
if !trace.is_empty() {
Expand Down
8 changes: 4 additions & 4 deletions test/src/specs/sync/get_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ impl Spec for GetBlocksTimeout {

let received = wait_get_blocks_point(&net, &node1, block_download_timeout_secs * 2, 1);
assert!(
received.is_none(),
"Should not received GetBlocks anymore, the timeout could be any number."
received.is_some(),
"in the case of sparse connections, even if download times out, net should continue to receive GetBlock requests"
);

let rpc_client = node1.rpc_client();
let result = wait_until(10, || {
let peers = rpc_client.get_peers();
peers.is_empty()
!peers.is_empty()
});
if !result {
panic!("node1 must disconnect net");
panic!("node1 must not disconnect net");
}
}
}
Expand Down

0 comments on commit f84f391

Please sign in to comment.