Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare syncing for parallel sync strategies #3224

Merged
merged 11 commits into from
Feb 13, 2024
Merged
102 changes: 45 additions & 57 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
SyncingAction, SyncingConfig, SyncingStrategy,
Key, SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,

/// Syncing configuration for startegies.
syncing_config: SyncingConfig,

/// Blockchain client.
client: Arc<Client>,

Expand Down Expand Up @@ -441,8 +438,7 @@ where
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -471,7 +467,6 @@ where
roles,
client,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -661,8 +656,15 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand All @@ -675,21 +677,23 @@ where

// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
error!(
target: LOG_TARGET,
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
);
return
}
}
}

fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
for action in self.strategy.actions()? {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
SyncingAction::SendBlockRequest { peer_id, key, request } => {
// Sending block request implies dropping obsolete pending response as we are
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());
let removed = self.pending_responses.remove(peer_id, key);
self.send_block_request(peer_id, key, request.clone());
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved

trace!(
target: LOG_TARGET,
Expand All @@ -699,24 +703,24 @@ where
removed,
)
},
SyncingAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
SyncingAction::CancelRequest { peer_id, key } => {
let removed = self.pending_responses.remove(peer_id, key);
altonen marked this conversation as resolved.
Show resolved Hide resolved

trace!(
target: LOG_TARGET,
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());

trace!(
target: LOG_TARGET,
Expand All @@ -726,7 +730,7 @@ where
);
},
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
Expand All @@ -753,20 +757,8 @@ where
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
// Nothing to do, this is handled internally by `SyncingStrategy`.
SyncingAction::Finished => {},
}
}

Expand Down Expand Up @@ -948,23 +940,18 @@ where
}
}

fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}
Expand Down Expand Up @@ -1002,7 +989,7 @@ where
}

self.strategy.remove_peer(&peer_id);
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
}
Expand Down Expand Up @@ -1167,7 +1154,7 @@ where
Ok(())
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
fn send_block_request(&mut self, peer_id: PeerId, key: Key, request: BlockRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1178,12 +1165,13 @@ where

self.pending_responses.insert(
peer_id,
key,
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
fn send_state_request(&mut self, peer_id: PeerId, key: Key, request: OpaqueStateRequest) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1192,7 +1180,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());

match Self::encode_state_request(&request) {
Ok(data) => {
Expand All @@ -1213,7 +1201,7 @@ where
}
}

fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
fn send_warp_proof_request(&mut self, peer_id: PeerId, key: Key, request: WarpProofRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1222,7 +1210,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
Expand Down Expand Up @@ -1259,14 +1247,14 @@ where
}

fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;
let ResponseEvent { peer_id, key, request, response } = response_event;

match response {
Ok(Ok((resp, _))) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
self.strategy.on_block_response(peer_id, req, blocks);
self.strategy.on_block_response(peer_id, key, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
Expand Down Expand Up @@ -1311,10 +1299,10 @@ where
},
};

self.strategy.on_state_response(peer_id, response);
self.strategy.on_state_response(peer_id, key, response);
},
PeerRequest::WarpProof => {
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
},
},
Ok(Err(e)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications
//! from peers taking into account forks and their finalization (dropping pending requests
//! that don't make sense after one of the forks is finalized).

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress};
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod justification_requests;
mod pending_responses;
mod request_metrics;
mod schema;
Expand Down
31 changes: 23 additions & 8 deletions substrate/client/network/sync/src/pending_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! [`PendingResponses`] is responsible for keeping track of pending responses and
//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.

use crate::{types::PeerRequest, LOG_TARGET};
use crate::{strategy::Key, types::PeerRequest, LOG_TARGET};
use futures::{
channel::oneshot,
future::BoxFuture,
Expand All @@ -42,14 +42,16 @@ type ResponseFuture = BoxFuture<'static, ResponseResult>;
/// An event we receive once a pending response future resolves.
pub(crate) struct ResponseEvent<B: BlockT> {
pub peer_id: PeerId,
pub key: Key,
pub request: PeerRequest<B>,
pub response: ResponseResult,
}

/// Stream taking care of polling pending responses.
pub(crate) struct PendingResponses<B: BlockT> {
/// Pending responses
pending_responses: StreamMap<PeerId, BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
pending_responses:
StreamMap<(PeerId, Key), BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
/// Waker to implement never terminating stream
waker: Option<Waker>,
}
Expand All @@ -62,6 +64,7 @@ impl<B: BlockT> PendingResponses<B> {
pub fn insert(
&mut self,
peer_id: PeerId,
key: Key,
request: PeerRequest<B>,
response_future: ResponseFuture,
) {
Expand All @@ -70,7 +73,7 @@ impl<B: BlockT> PendingResponses<B> {
if self
.pending_responses
.insert(
peer_id,
(peer_id, key),
Box::pin(async move { (request, response_future.await) }.into_stream()),
)
.is_some()
Expand All @@ -87,8 +90,20 @@ impl<B: BlockT> PendingResponses<B> {
}
}

pub fn remove(&mut self, peer_id: &PeerId) -> bool {
self.pending_responses.remove(peer_id).is_some()
pub fn remove(&mut self, peer_id: PeerId, key: Key) -> bool {
self.pending_responses.remove(&(peer_id, key)).is_some()
}

pub fn remove_all(&mut self, peer_id: &PeerId) {
let to_remove = self
.pending_responses
.keys()
.filter(|(peer, _key)| peer == peer_id)
.cloned()
.collect::<Vec<_>>();
to_remove.iter().for_each(|k| {
self.pending_responses.remove(k);
});
altonen marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn len(&self) -> usize {
Expand All @@ -104,13 +119,13 @@ impl<B: BlockT> Stream for PendingResponses<B> {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.pending_responses.poll_next_unpin(cx) {
Poll::Ready(Some((peer_id, (request, response)))) => {
Poll::Ready(Some(((peer_id, key), (request, response)))) => {
// We need to manually remove the stream, because `StreamMap` doesn't know yet that
// it's going to yield `None`, so may not remove it before the next request is made
// to the same peer.
self.pending_responses.remove(&peer_id);
self.pending_responses.remove(&(peer_id, key));

Poll::Ready(Some(ResponseEvent { peer_id, request, response }))
Poll::Ready(Some(ResponseEvent { peer_id, key, request, response }))
},
Poll::Ready(None) | Poll::Pending => {
self.waker = Some(cx.waker().clone());
Expand Down
Loading
Loading