Skip to content

Commit

Permalink
Move import queue from ChainSync to SyncingEngine (paritytech#1736)
Browse files Browse the repository at this point in the history
This PR is part of [Sync
2.0](paritytech#534) refactoring
aimed at making `ChainSync` a pure state machine.

Resolves paritytech#501.
  • Loading branch information
dmitry-markin authored Sep 29, 2023
1 parent 22dd85f commit c098425
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 171 deletions.
17 changes: 14 additions & 3 deletions substrate/client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ impl fmt::Display for BadPeer {

impl std::error::Error for BadPeer {}

/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ImportBlocksAction<B: BlockT> {
pub origin: BlockOrigin,
pub blocks: Vec<IncomingBlock<B>>,
}

/// Result of [`ChainSync::on_block_data`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OnBlockData<Block: BlockT> {
/// The block should be imported.
Import(BlockOrigin, Vec<IncomingBlock<Block>>),
Import(ImportBlocksAction<Block>),
/// A new block request needs to be made to the given peer.
Request(PeerId, BlockRequest<Block>),
/// Continue processing events.
Expand All @@ -134,7 +141,7 @@ pub enum OnBlockJustification<Block: BlockT> {
Nothing,
/// The justification should be imported.
Import {
peer: PeerId,
peer_id: PeerId,
hash: Block::Hash,
number: NumberFor<Block>,
justifications: Justifications,
Expand Down Expand Up @@ -309,6 +316,7 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
#[must_use]
fn new_peer(
&mut self,
who: PeerId,
Expand Down Expand Up @@ -340,6 +348,7 @@ pub trait ChainSync<Block: BlockT>: Send {
///
/// If this corresponds to a valid block, this outputs the block that
/// must be imported in the import queue.
#[must_use]
fn on_block_data(
&mut self,
who: &PeerId,
Expand All @@ -350,6 +359,7 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
#[must_use]
fn on_block_justification(
&mut self,
who: PeerId,
Expand Down Expand Up @@ -379,7 +389,8 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
fn peer_disconnected(&mut self, who: &PeerId);
#[must_use]
fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<Block>>;

/// Return some key metrics.
fn metrics(&self) -> Metrics;
Expand Down
77 changes: 64 additions & 13 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{
schema::v1::{StateRequest, StateResponse},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
BlockRequestEvent, ChainSync, ClientError, SyncingService,
BlockRequestAction, ChainSync, ClientError, ImportBlocksAction, ImportJustificationsAction,
OnBlockResponse, SyncingService,
};

use codec::{Decode, Encode};
Expand All @@ -41,7 +42,8 @@ use futures_timer::Delay;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
register, Counter, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry,
SourcedGauge, U64,
};
use prost::Message;
use schnellru::{ByLength, LruMap};
Expand Down Expand Up @@ -135,6 +137,8 @@ struct Metrics {
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
justifications: GaugeVec<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -164,6 +168,20 @@ impl Metrics {
)?;
register(g, r)?
},
import_queue_blocks_submitted: {
let c = Counter::new(
"substrate_sync_import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?;
register(c, r)?
},
import_queue_justifications_submitted: {
let c = Counter::new(
"substrate_sync_import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?;
register(c, r)?
},
})
}
}
Expand Down Expand Up @@ -311,6 +329,9 @@ pub struct SyncingEngine<B: BlockT, Client> {

/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
}

impl<B: BlockT, Client> SyncingEngine<B, Client>
Expand Down Expand Up @@ -436,9 +457,7 @@ where
max_parallel_downloads,
max_blocks_per_request,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
)?;

let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -501,6 +520,7 @@ where
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
},
SyncingService::new(tx, num_connected, is_major_syncing),
block_announce_config,
Expand Down Expand Up @@ -728,13 +748,13 @@ where
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok(event) => match event {
BlockRequestEvent::SendRequest { peer_id, request } => {
Ok(action) => match action {
BlockRequestAction::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request);
},
BlockRequestEvent::RemoveStale { peer_id } => {
BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Expand Down Expand Up @@ -922,7 +942,10 @@ where
}
}

self.chain_sync.peer_disconnected(&peer_id);
if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) {
self.import_blocks(import_blocks_action)
}

self.pending_responses.remove(&peer_id);
self.event_streams.retain(|stream| {
stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()
Expand Down Expand Up @@ -1181,10 +1204,14 @@ where
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
if let Some((peer_id, new_req)) =
self.chain_sync.on_block_response(peer_id, req, blocks)
{
self.send_block_request(peer_id, new_req);
match self.chain_sync.on_block_response(peer_id, req, blocks) {
OnBlockResponse::SendBlockRequest { peer_id, request } =>
self.send_block_request(peer_id, request),
OnBlockResponse::ImportBlocks(import_blocks_action) =>
self.import_blocks(import_blocks_action),
OnBlockResponse::ImportJustifications(action) =>
self.import_justifications(action),
OnBlockResponse::Nothing => {},
}
},
Err(BlockResponseError::DecodeFailed(e)) => {
Expand Down Expand Up @@ -1230,7 +1257,11 @@ where
},
};

self.chain_sync.on_state_response(peer_id, response);
if let Some(import_blocks_action) =
self.chain_sync.on_state_response(peer_id, response)
{
self.import_blocks(import_blocks_action);
}
},
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp));
Expand Down Expand Up @@ -1337,4 +1368,24 @@ where
},
}
}

/// Import blocks.
fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction<B>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_blocks_submitted.inc();
}

self.import_queue.import_blocks(origin, blocks);
}

/// Import justifications.
fn import_justifications(&mut self, action: ImportJustificationsAction<B>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_justifications_submitted.inc();
}

let ImportJustificationsAction { peer_id, hash, number, justifications } = action;

self.import_queue.import_justifications(peer_id, hash, number, justifications);
}
}
Loading

0 comments on commit c098425

Please sign in to comment.