Skip to content

Commit

Permalink
Modular block request handler (paritytech#1524)
Browse files Browse the repository at this point in the history
Submit the outstanding PRs from the old repos(these were already
reviewed and approved before the repo rorg, but not yet submitted):
Main PR: paritytech/substrate#14014
Companion PRs: paritytech/polkadot#7134,
paritytech/cumulus#2489

The changes in the PR:
1. ChainSync currently calls into the block request handler directly.
Instead, move the block request handler behind a trait. This allows new
protocols to be plugged into ChainSync.
2. BuildNetworkParams is changed so that custom relay protocol
implementations can be (optionally) passed in during network creation
time. If custom protocol is not specified, it defaults to the existing
block handler
3. BlockServer and BlockDownloader traits are introduced for the
protocol implementation. The existing block handler has been changed to
implement these traits
4. Other changes:
[X] Make TxHash serializable. This is needed for exchanging the
serialized hash in the relay protocol messages
[X] Clean up types no longer used(OpaqueBlockRequest,
OpaqueBlockResponse)

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: command-bot <>
  • Loading branch information
rahulksnv and dmitry-markin authored Sep 15, 2023
1 parent eb2a523 commit 85fe3e6
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 256 deletions.
1 change: 1 addition & 0 deletions substrate/bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
block_relay: None,
})?;

if config.offchain_worker.enabled {
Expand Down
1 change: 1 addition & 0 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ pub fn new_full_base(
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
block_relay: None,
})?;

let role = config.role.clone();
Expand Down
33 changes: 2 additions & 31 deletions substrate/client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ pub mod message;
pub mod metrics;
pub mod warp;

use crate::{role::Roles, sync::message::BlockAnnounce, types::ReputationChange};
use crate::{role::Roles, types::ReputationChange};
use futures::Stream;

use libp2p_identity::PeerId;

use message::{BlockData, BlockRequest, BlockResponse};
use message::{BlockAnnounce, BlockRequest, BlockResponse};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
Expand Down Expand Up @@ -226,28 +226,6 @@ impl fmt::Debug for OpaqueStateResponse {
}
}

/// Wrapper for implementation-specific block request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueBlockRequest(pub Box<dyn Any + Send>);

impl fmt::Debug for OpaqueBlockRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueBlockRequest").finish()
}
}

/// Wrapper for implementation-specific block response.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueBlockResponse(pub Box<dyn Any + Send>);

impl fmt::Debug for OpaqueBlockResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueBlockResponse").finish()
}
}

/// Provides high-level status of syncing.
#[async_trait::async_trait]
pub trait SyncStatusProvider<Block: BlockT>: Send + Sync {
Expand Down Expand Up @@ -392,13 +370,6 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Return some key metrics.
fn metrics(&self) -> Metrics;

/// Access blocks from implementation-specific block response.
fn block_response_into_blocks(
&self,
request: &BlockRequest<Block>,
response: OpaqueBlockResponse,
) -> Result<Vec<BlockData<Block>>, String>;

/// Advance the state of `ChainSync`
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>;

Expand Down
72 changes: 72 additions & 0 deletions substrate/client/network/sync/src/block_relay_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! Block relay protocol related definitions.

use futures::channel::oneshot;
use libp2p::PeerId;
use sc_network::request_responses::{ProtocolConfig, RequestFailure};
use sc_network_common::sync::message::{BlockData, BlockRequest};
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;

/// The serving side of the block relay protocol. It runs a single instance
/// of the server task that processes the incoming protocol messages.
#[async_trait::async_trait]
pub trait BlockServer<Block: BlockT>: Send {
/// Starts the protocol processing.
async fn run(&mut self);
}

/// The client side stub to download blocks from peers. This is a handle
/// that can be used to initiate concurrent downloads.
#[async_trait::async_trait]
pub trait BlockDownloader<Block: BlockT>: Send + Sync {
/// Performs the protocol specific sequence to fetch the blocks from the peer.
/// Output: if the download succeeds, the response is a `Vec<u8>` which is
/// in a format specific to the protocol implementation. The block data
/// can be extracted from this response using [`BlockDownloader::block_response_into_blocks`].
async fn download_blocks(
&self,
who: PeerId,
request: BlockRequest<Block>,
) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>;

/// Parses the protocol specific response to retrieve the block data.
fn block_response_into_blocks(
&self,
request: &BlockRequest<Block>,
response: Vec<u8>,
) -> Result<Vec<BlockData<Block>>, BlockResponseError>;
}

/// Errors returned by [`BlockDownloader::block_response_into_blocks`].
#[derive(Debug)]
pub enum BlockResponseError {
/// Failed to decode the response bytes.
DecodeFailed(String),

/// Failed to extract the blocks from the decoded bytes.
ExtractionFailed(String),
}

/// Block relay specific params for network creation, specified in
/// ['sc_service::BuildNetworkParams'].
pub struct BlockRelayParams<Block: BlockT> {
pub server: Box<dyn BlockServer<Block>>,
pub downloader: Arc<dyn BlockDownloader<Block>>,
pub request_response_config: ProtocolConfig,
}
168 changes: 155 additions & 13 deletions substrate/client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,35 @@
//! `crate::request_responses::RequestResponsesBehaviour`.

use crate::{
schema::v1::{block_request::FromBlock, BlockResponse, Direction},
block_relay_protocol::{BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer},
schema::v1::{
block_request::FromBlock as FromBlockSchema, BlockRequest as BlockRequestSchema,
BlockResponse as BlockResponseSchema, BlockResponse, Direction,
},
service::network::NetworkServiceHandle,
MAX_BLOCKS_IN_RESPONSE,
};

use codec::{Decode, Encode};
use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::debug;
use prost::Message;
use schnellru::{ByLength, LruMap};

use sc_client_api::BlockBackend;
use sc_network::{
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
request_responses::{
IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure,
},
types::ProtocolName,
};
use sc_network_common::sync::message::BlockAttributes;
use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header, One, Zero},
};

use std::{
cmp::min,
hash::{Hash, Hasher},
Expand Down Expand Up @@ -129,7 +135,8 @@ enum SeenRequestsValue {
Fulfilled(usize),
}

/// Handler for incoming block requests from a remote peer.
/// The full block server implementation of [`BlockServer`]. It handles
/// the incoming block requests from a remote peer.
pub struct BlockRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: async_channel::Receiver<IncomingRequest>,
Expand All @@ -146,11 +153,12 @@ where
{
/// Create a new [`BlockRequestHandler`].
pub fn new(
network: NetworkServiceHandle,
protocol_id: &ProtocolId,
fork_id: Option<&str>,
client: Arc<Client>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
) -> BlockRelayParams<B> {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let capacity = std::cmp::max(num_peer_hint, 1);
Expand All @@ -170,11 +178,15 @@ where
let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
let seen_requests = LruMap::new(capacity);

(Self { client, request_receiver, seen_requests }, protocol_config)
BlockRelayParams {
server: Box::new(Self { client, request_receiver, seen_requests }),
downloader: Arc::new(FullBlockDownloader::new(protocol_config.name.clone(), network)),
request_response_config: protocol_config,
}
}

/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
async fn process_requests(&mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;

Expand All @@ -197,11 +209,11 @@ where
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;

let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? {
FromBlock::Hash(ref h) => {
FromBlockSchema::Hash(ref h) => {
let h = Decode::decode(&mut h.as_ref())?;
BlockId::<B>::Hash(h)
},
FromBlock::Number(ref n) => {
FromBlockSchema::Number(ref n) => {
let n = Decode::decode(&mut n.as_ref())?;
BlockId::<B>::Number(n)
},
Expand Down Expand Up @@ -448,6 +460,17 @@ where
}
}

#[async_trait::async_trait]
impl<B, Client> BlockServer<B> for BlockRequestHandler<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
{
async fn run(&mut self) {
self.process_requests().await;
}
}

#[derive(Debug, thiserror::Error)]
enum HandleRequestError {
#[error("Failed to decode request: {0}.")]
Expand All @@ -465,3 +488,122 @@ enum HandleRequestError {
#[error("Failed to send response.")]
SendResponse,
}

/// The full block downloader implementation of [`BlockDownloader].
pub struct FullBlockDownloader {
protocol_name: ProtocolName,
network: NetworkServiceHandle,
}

impl FullBlockDownloader {
fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self {
Self { protocol_name, network }
}

/// Extracts the blocks from the response schema.
fn blocks_from_schema<B: BlockT>(
&self,
request: &BlockRequest<B>,
response: BlockResponseSchema,
) -> Result<Vec<BlockData<B>>, String> {
response
.blocks
.into_iter()
.map(|block_data| {
Ok(BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if request.fields.contains(BlockAttributes::BODY) {
Some(
block_data
.body
.iter()
.map(|body| Decode::decode(&mut body.as_ref()))
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
},
indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
Some(block_data.indexed_body)
} else {
None
},
receipt: if !block_data.receipt.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
justifications: if !block_data.justifications.is_empty() {
Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?)
} else {
None
},
})
})
.collect::<Result<_, _>>()
.map_err(|error: codec::Error| error.to_string())
}
}

#[async_trait::async_trait]
impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader {
async fn download_blocks(
&self,
who: PeerId,
request: BlockRequest<B>,
) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled> {
// Build the request protobuf.
let bytes = BlockRequestSchema {
fields: request.fields.to_be_u32(),
from_block: match request.from {
FromBlock::Hash(h) => Some(FromBlockSchema::Hash(h.encode())),
FromBlock::Number(n) => Some(FromBlockSchema::Number(n.encode())),
},
direction: request.direction as i32,
max_blocks: request.max.unwrap_or(0),
support_multiple_justifications: true,
}
.encode_to_vec();

let (tx, rx) = oneshot::channel();
self.network.start_request(
who,
self.protocol_name.clone(),
bytes,
tx,
IfDisconnected::ImmediateError,
);
rx.await
}

fn block_response_into_blocks(
&self,
request: &BlockRequest<B>,
response: Vec<u8>,
) -> Result<Vec<BlockData<B>>, BlockResponseError> {
// Decode the response protobuf
let response_schema = BlockResponseSchema::decode(response.as_slice())
.map_err(|error| BlockResponseError::DecodeFailed(error.to_string()))?;

// Extract the block data from the protobuf
self.blocks_from_schema::<B>(request, response_schema)
.map_err(|error| BlockResponseError::ExtractionFailed(error.to_string()))
}
}
Loading

0 comments on commit 85fe3e6

Please sign in to comment.