Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Increase parallel downloads to 5 (#4045)
Browse files Browse the repository at this point in the history
* Increase parallel downloads to 5

* CLI param
  • Loading branch information
arkpar authored and gavofyork committed Nov 8, 2019
1 parent 56d8c8c commit 6fb3758
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 16 deletions.
2 changes: 2 additions & 0 deletions core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ fn fill_network_configuration(
wasm_external_transport: None,
};

config.max_parallel_downloads = cli.max_parallel_downloads;

Ok(())
}

Expand Down
11 changes: 9 additions & 2 deletions core/cli/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ pub struct NetworkConfigurationParams {
pub port: Option<u16>,

/// Specify the number of outgoing connections we're trying to maintain.
#[structopt(long = "out-peers", value_name = "OUT_PEERS", default_value = "25")]
#[structopt(long = "out-peers", value_name = "COUNT", default_value = "25")]
pub out_peers: u32,

/// Specify the maximum number of incoming connections we're accepting.
#[structopt(long = "in-peers", value_name = "IN_PEERS", default_value = "25")]
#[structopt(long = "in-peers", value_name = "COUNT", default_value = "25")]
pub in_peers: u32,

/// Disable mDNS discovery.
Expand All @@ -160,6 +160,13 @@ pub struct NetworkConfigurationParams {
#[structopt(long = "no-mdns")]
pub no_mdns: bool,

/// Maximum number of peers to ask the same blocks in parallel.
///
/// This allows downlading announced blocks from multiple peers. Decrease to save
/// traffic and risk increased latency.
#[structopt(long = "max-parallel-downloads", value_name = "COUNT", default_value = "5")]
pub max_parallel_downloads: u32,

#[allow(missing_docs)]
#[structopt(flatten)]
pub node_key_params: NodeKeyParams
Expand Down
6 changes: 3 additions & 3 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ impl TestNetFactory for GrandpaTestNet {

fn default_config() -> ProtocolConfig {
// the authority role ensures gossip hits all nodes here.
ProtocolConfig {
roles: Roles::AUTHORITY,
}
let mut config = ProtocolConfig::default();
config.roles = Roles::AUTHORITY;
config
}

fn make_verifier(
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
pub specialization: S,

/// Type to check incoming block announcements.
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
}

bitflags! {
Expand Down Expand Up @@ -261,6 +261,8 @@ pub struct NetworkConfiguration {
pub node_name: String,
/// Configuration for the transport layer.
pub transport: TransportConfig,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
}

impl Default for NetworkConfiguration {
Expand All @@ -282,6 +284,7 @@ impl Default for NetworkConfiguration {
enable_mdns: false,
wasm_external_transport: None,
},
max_parallel_downloads: 5,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,15 @@ struct ContextData<B: BlockT, H: ExHashT> {
pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Roles,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
}

impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
ProtocolConfig {
roles: Roles::FULL,
max_parallel_downloads: 5,
}
}
}
Expand All @@ -393,6 +396,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&info,
finality_proof_request_builder,
block_announce_validator,
config.max_parallel_downloads,
);
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
Expand Down
28 changes: 19 additions & 9 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ pub struct ChainSync<B: BlockT> {
/// A flag that caches idle state with no pending requests.
is_idle: bool,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
max_parallel_downloads: u32,
}

/// All the data we have about a Peer that we are trying to sync with
Expand Down Expand Up @@ -282,7 +284,8 @@ impl<B: BlockT> ChainSync<B> {
client: Arc<dyn crate::chain::Client<B>>,
info: &ClientInfo<B>,
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
) -> Self {
let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION;

Expand All @@ -306,6 +309,7 @@ impl<B: BlockT> ChainSync<B> {
fork_targets: Default::default(),
is_idle: false,
block_announce_validator,
max_parallel_downloads,
}
}

Expand Down Expand Up @@ -571,6 +575,7 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", id);
Expand All @@ -592,13 +597,19 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) {
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, max_parallel) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(target: "sync", "New block request for {}", id);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
have_requests = true;
Some((id.clone(), req))
} else {
trace!(target: "sync", "No new block request for {}", id);
None
}
});
Expand Down Expand Up @@ -1006,7 +1017,7 @@ impl<B: BlockT> ChainSync<B> {
{
let header = &announce.header;
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
debug!(target: "sync", "Received block announcement {:?} with number {:?} from {}", hash, number, who);
if number.is_zero() {
warn!(target: "sync", "Ignored genesis block (#0) announcement from {}: {}", who, hash);
return OnBlockAnnounce::Nothing
Expand Down Expand Up @@ -1226,15 +1237,14 @@ fn peer_block_request<B: BlockT>(
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: &message::BlockAttributes,
major_sync: bool,
max_parallel_downloads: u32,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let max_parallel = if major_sync { 1 } else { 3 };
if let Some(range) = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
peer.best_number,
peer.common_number,
max_parallel,
max_parallel_downloads,
) {
let request = message::generic::BlockRequest {
id: 0,
Expand Down
4 changes: 4 additions & 0 deletions core/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl<B: BlockT> BlockCollection<B> {
max_parallel: u32,
) -> Option<Range<NumberFor<B>>>
{
if peer_best <= common {
// Bail out early
return None;
}
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let (protocol, peerset_handle) = Protocol::new(
protocol::ProtocolConfig { roles: params.roles },
protocol::ProtocolConfig {
roles: params.roles,
max_parallel_downloads: params.network_config.max_parallel_downloads,
},
params.chain,
params.on_demand.as_ref().map(|od| od.checker().clone())
.unwrap_or(Arc::new(AlwaysBadChecker)),
Expand Down
1 change: 1 addition & 0 deletions core/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fn node_config<G, E: Clone> (
enable_mdns: false,
wasm_external_transport: None,
},
max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads,
};

Configuration {
Expand Down

0 comments on commit 6fb3758

Please sign in to comment.