From eba8d2c8b485e9592118b33a2773a2a9cd2d26d5 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Tue, 24 Aug 2021 15:05:25 +0200 Subject: [PATCH] Fill up requests slots via `launch_parallel_requests` (#3681) in case waiting for the next response takes too long. --- node/network/availability-recovery/src/lib.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 2f9696cdfab6..ea5822e896a9 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -21,6 +21,7 @@ use std::{ collections::{HashMap, VecDeque}, pin::Pin, + time::Duration, }; use futures::{ @@ -43,7 +44,7 @@ use polkadot_node_network_protocol::{ IfDisconnected, UnifiedReputationChange as Rep, }; use polkadot_node_primitives::{AvailableData, ErasureChunk}; -use polkadot_node_subsystem_util::request_session_info; +use polkadot_node_subsystem_util::{request_session_info, TimeoutExt}; use polkadot_primitives::v1::{ AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HashT, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, @@ -72,6 +73,10 @@ const LRU_SIZE: usize = 16; const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request"); +/// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill +/// up slots. +const MAX_CHUNK_WAIT: Duration = Duration::from_secs(1); + /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { fast_path: bool, @@ -285,7 +290,11 @@ impl RequestChunksPhase { async fn wait_for_chunks(&mut self, params: &InteractionParams) { // Wait for all current requests to conclude or time-out, or until we reach enough chunks. - while let Some(request_result) = self.requesting_chunks.next().await { + // We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so + // `launch_parallel_requests` cann fill up slots again. + while let Some(request_result) = + self.requesting_chunks.next().timeout(MAX_CHUNK_WAIT).await.flatten() + { match request_result { Ok(Some(chunk)) => { // Check merkle proofs of any received chunks.