Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fix download scheduler #2628

Merged
merged 2 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,15 @@ impl Synchronizer {
};

for peer in disconnect_list.iter() {
// It is not forbidden to evict protected nodes:
// - First of all, this node is not designated by the user for protection,
// but is connected randomly. It does not represent the will of the user
// - Secondly, in the synchronization phase, the nodes with zero download tasks are
// retained, apart from reducing the download efficiency, there is no benefit.
if self
.peers()
.get_flag(*peer)
.map(|flag| flag.is_whitelist || flag.is_protect)
.map(|flag| flag.is_whitelist)
.unwrap_or(false)
{
continue;
Expand Down
2 changes: 2 additions & 0 deletions sync/src/tests/inflight_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ fn inflight_blocks_timeout() {
let faketime_file = faketime::millis_tempfile(0).expect("create faketime file");
faketime::enable(&faketime_file);
let mut inflight_blocks = InflightBlocks::default();
inflight_blocks.protect_num = 0;

assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into()));
Expand Down Expand Up @@ -147,6 +148,7 @@ fn inflight_trace_number_state() {
faketime::enable(&faketime_file);

let mut inflight_blocks = InflightBlocks::default();
inflight_blocks.protect_num = 0;

assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(2.into(), (2, h256!("0x2").pack()).into()));
Expand Down
45 changes: 36 additions & 9 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use ckb_chain_spec::consensus::Consensus;
use ckb_constant::sync::{
BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, POW_INTERVAL, RETRY_ASK_TX_TIMEOUT_INCREASE,
SUSPEND_SYNC_TIME,
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
POW_INTERVAL, RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
};
use ckb_error::AnyError;
use ckb_logger::{debug, debug_target, error, trace};
Expand Down Expand Up @@ -565,6 +565,7 @@ pub struct InflightBlocks {
pub(crate) restart_number: BlockNumber,
time_analyzer: TimeAnalyzer,
pub(crate) adjustment: bool,
pub(crate) protect_num: usize,
}

impl Default for InflightBlocks {
Expand All @@ -577,6 +578,7 @@ impl Default for InflightBlocks {
restart_number: 0,
time_analyzer: TimeAnalyzer::default(),
adjustment: true,
protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
}
}
}
Expand Down Expand Up @@ -683,6 +685,16 @@ impl InflightBlocks {
pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
let now = unix_time_as_millis();
let mut disconnect_list = HashSet::new();
// Since statistics are currently disturbed by the processing block time, when the number
// of transactions increases, the node will be accidentally evicted.
//
// Especially on machines with poor CPU performance, the node connection will be frequently
// disconnected due to statistics.
//
// In order to protect the decentralization of the network and ensure the survival of low-performance
// nodes, the penalty mechanism will be closed when the number of download nodes is less than the number of protected nodes
let should_punish = self.download_schedulers.len() > self.protect_num;
Copy link
Member

@doitian doitian Apr 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small suggestion: this can be extracted as a small function self.should_punish()

let adjustment = self.adjustment;

let trace = &mut self.trace_number;
let download_schedulers = &mut self.download_schedulers;
Expand All @@ -702,7 +714,9 @@ impl InflightBlocks {
if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
if let Some(set) = download_schedulers.get_mut(&value.peer) {
set.hashes.remove(key);
set.punish(2);
if should_punish {
set.punish(2);
}
};
if !trace.is_empty() {
trace.remove(&key);
Expand Down Expand Up @@ -742,13 +756,17 @@ impl InflightBlocks {
if now > 1000 + *time {
if let Some(state) = states.remove(key) {
if let Some(d) = download_schedulers.get_mut(&state.peer) {
d.punish(1);
if should_punish {
d.punish(1);
}
d.hashes.remove(key);
};
} else if let Some(v) = compact_inflight.remove(&key.hash) {
for peer in v {
if let Some(d) = download_schedulers.get_mut(&peer) {
d.punish(1);
if should_punish && adjustment {
for peer in v {
if let Some(d) = download_schedulers.get_mut(&peer) {
d.punish(1);
}
}
}
}
Expand Down Expand Up @@ -817,6 +835,7 @@ impl InflightBlocks {
}

pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
let should_punish = self.download_schedulers.len() > self.protect_num;
let download_schedulers = &mut self.download_schedulers;
let trace = &mut self.trace_number;
let compact = &mut self.compact_reconstruct_inflight;
Expand All @@ -835,8 +854,16 @@ impl InflightBlocks {
match time_analyzer.push_time(elapsed) {
TimeQuantile::MinToFast => set.increase(2),
TimeQuantile::FastToNormal => set.increase(1),
TimeQuantile::NormalToUpper => set.decrease(1),
TimeQuantile::UpperToMax => set.decrease(2),
TimeQuantile::NormalToUpper => {
if should_punish {
set.decrease(1)
}
}
TimeQuantile::UpperToMax => {
if should_punish {
set.decrease(2)
}
}
}
}
if !trace.is_empty() {
Expand Down
8 changes: 4 additions & 4 deletions test/src/specs/sync/get_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ impl Spec for GetBlocksTimeout {

let received = wait_get_blocks_point(&net, &node1, block_download_timeout_secs * 2, 1);
assert!(
received.is_none(),
"Should not received GetBlocks anymore, the timeout could be any number."
received.is_some(),
quake marked this conversation as resolved.
Show resolved Hide resolved
"in the case of sparse connections, even if download times out, net should continue to receive GetBlock requests"
);

let rpc_client = node1.rpc_client();
let result = wait_until(10, || {
let peers = rpc_client.get_peers();
peers.is_empty()
!peers.is_empty()
});
if !result {
panic!("node1 must disconnect net");
panic!("node1 must not disconnect net");
}
}
}
Expand Down