Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fill up requests slots via launch_parallel_requests (#3681)
Browse files Browse the repository at this point in the history
in case waiting for the next response takes too long.
  • Loading branch information
eskimor committed Aug 24, 2021
1 parent b1e0eef commit eba8d2c
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
time::Duration,
};

use futures::{
Expand All @@ -43,7 +44,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_node_subsystem_util::{request_session_info, TimeoutExt};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
Expand Down Expand Up @@ -72,6 +73,10 @@ const LRU_SIZE: usize = 16;

const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");

/// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill
/// up slots.
const MAX_CHUNK_WAIT: Duration = Duration::from_secs(1);

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
Expand Down Expand Up @@ -285,7 +290,11 @@ impl RequestChunksPhase {

async fn wait_for_chunks(&mut self, params: &InteractionParams) {
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
// We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so
// `launch_parallel_requests` cann fill up slots again.
while let Some(request_result) =
self.requesting_chunks.next().timeout(MAX_CHUNK_WAIT).await.flatten()
{
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
Expand Down

0 comments on commit eba8d2c

Please sign in to comment.