Skip to content

Commit

Permalink
Prepare syncing for parallel sync strategies (paritytech#3224)
Browse files Browse the repository at this point in the history
This PR should supersede
paritytech#2814 and accomplish the
same with less changes. It's needed to run sync strategies in parallel,
like running `ChainSync` and `GapSync` as independent strategies, and
running `ChainSync` and Sync 2.0 alongside each other.

The difference with paritytech#2814
is that we allow simultaneous requests to remote peers initiated by
different strategies, as this is not tracked on the remote node in any
way. Therefore, `PeerPool` is not needed.

CC @skunert

---------

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
dmitry-markin and skunert committed Feb 13, 2024
1 parent e0c902e commit 96ebb30
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 383 deletions.
143 changes: 76 additions & 67 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
SyncingAction, SyncingConfig, SyncingStrategy,
StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand All @@ -48,7 +48,7 @@ use futures::{
FutureExt, StreamExt,
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, trace};
use log::{debug, error, trace, warn};
use prometheus_endpoint::{
register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
Expand Down Expand Up @@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,

/// Syncing configuration for startegies.
syncing_config: SyncingConfig,

/// Blockchain client.
client: Arc<Client>,

Expand Down Expand Up @@ -441,8 +438,7 @@ where
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -471,7 +467,6 @@ where
roles,
client,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -661,8 +656,15 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand All @@ -675,48 +677,61 @@ where

// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
error!(
target: LOG_TARGET,
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
);
return
}
}
}

fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
for action in self.strategy.actions()? {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
SyncingAction::SendBlockRequest { peer_id, key, request } => {
// Sending block request implies dropping obsolete pending response as we are
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
let removed = self.pending_responses.remove(peer_id, key);
self.send_block_request(peer_id, key, request.clone());

if removed {
warn!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
Stale response removed!",
peer_id,
key,
request,
)
} else {
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
peer_id,
key,
request,
)
}
},
SyncingAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
SyncingAction::CancelRequest { peer_id, key } => {
let removed = self.pending_responses.remove(peer_id, key);

trace!(
target: LOG_TARGET,
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());

trace!(
target: LOG_TARGET,
Expand All @@ -726,7 +741,7 @@ where
);
},
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
Expand All @@ -753,20 +768,8 @@ where
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
// Nothing to do, this is handled internally by `SyncingStrategy`.
SyncingAction::Finished => {},
}
}

Expand Down Expand Up @@ -948,23 +951,18 @@ where
}
}

fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}
Expand Down Expand Up @@ -1002,7 +1000,7 @@ where
}

self.strategy.remove_peer(&peer_id);
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
}
Expand Down Expand Up @@ -1167,7 +1165,7 @@ where
Ok(())
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1178,12 +1176,18 @@ where

self.pending_responses.insert(
peer_id,
key,
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
fn send_state_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1192,7 +1196,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());

match Self::encode_state_request(&request) {
Ok(data) => {
Expand All @@ -1213,7 +1217,12 @@ where
}
}

fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
fn send_warp_proof_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1222,7 +1231,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
Expand Down Expand Up @@ -1259,14 +1268,14 @@ where
}

fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;
let ResponseEvent { peer_id, key, request, response } = response_event;

match response {
Ok(Ok((resp, _))) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
self.strategy.on_block_response(peer_id, req, blocks);
self.strategy.on_block_response(peer_id, key, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
Expand Down Expand Up @@ -1311,10 +1320,10 @@ where
},
};

self.strategy.on_state_response(peer_id, response);
self.strategy.on_state_response(peer_id, key, response);
},
PeerRequest::WarpProof => {
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
},
},
Ok(Err(e)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications
//! from peers taking into account forks and their finalization (dropping pending requests
//! that don't make sense after one of the forks is finalized).

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress};
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod justification_requests;
mod pending_responses;
mod request_metrics;
mod schema;
Expand Down
Loading

0 comments on commit 96ebb30

Please sign in to comment.