diff --git a/Cargo.lock b/Cargo.lock index 335e8b3526792..08915196d5374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,9 +927,9 @@ dependencies = [ [[package]] name = "cid" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a52cffa791ce5cf490ac3b2d6df970dc04f931b04e727be3c3e220e17164dfc4" +checksum = "f6ed9c8b2d17acb8110c46f1da5bf4a696d745e1474a16db0cd2b49cd0249bf2" dependencies = [ "core2", "multibase", @@ -3591,8 +3591,8 @@ dependencies = [ "libp2p-request-response", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.8.5", ] @@ -3618,8 +3618,8 @@ dependencies = [ "multistream-select", "parking_lot 0.12.1", "pin-project", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.8.5", "ring", "rw-stream-sink", @@ -3669,8 +3669,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.7.3", "smallvec", ] @@ -3693,8 +3693,8 @@ dependencies = [ "libp2p-swarm", "log", "prometheus-client", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.7.3", "regex", "sha2 0.10.2", @@ -3716,8 +3716,8 @@ dependencies = [ "libp2p-swarm", "log", "lru", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "prost-codec", "smallvec", "thiserror", @@ -3741,8 +3741,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.7.3", "sha2 0.10.2", "smallvec", @@ -3819,8 +3819,8 @@ dependencies = [ "lazy_static", "libp2p-core", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.8.5", "sha2 0.10.2", "snow", @@ -3856,8 +3856,8 @@ dependencies = [ "futures", "libp2p-core", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "unsigned-varint", "void", ] @@ -3892,8 +3892,8 @@ dependencies = [ "libp2p-swarm", "log", "pin-project", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "prost-codec", "rand 0.8.5", "smallvec", @@ -3916,8 +3916,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "rand 0.8.5", "sha2 0.10.2", "thiserror", @@ -7061,7 +7061,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.10.1", +] + +[[package]] +name = "prost" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +dependencies = [ + "bytes", + "prost-derive 0.11.0", ] [[package]] @@ -7079,8 +7089,28 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.10.3", + "prost-types 0.10.1", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-build" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost 0.11.0", + "prost-types 0.11.1", "regex", "tempfile", "which", @@ -7094,7 +7124,7 @@ checksum = "00af1e92c33b4813cc79fda3f2dbf56af5169709be0202df730e9ebc3e4cd007" dependencies = [ "asynchronous-codec", "bytes", - "prost", + "prost 0.10.3", "thiserror", "unsigned-varint", ] @@ -7112,6 +7142,19 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-types" version = "0.10.1" @@ -7119,7 +7162,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" dependencies = [ "bytes", - "prost", + "prost 0.10.3", +] + +[[package]] +name = "prost-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +dependencies = [ + "bytes", + "prost 0.11.0", ] [[package]] @@ -7728,8 +7781,8 @@ dependencies = [ "libp2p", "log", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "quickcheck", "rand 0.7.3", "sc-client-api", @@ -8384,8 +8437,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.12.1", "pin-project", - "prost", - "prost-build", + "prost 0.10.3", "rand 0.7.3", "sc-block-builder", "sc-client-api", @@ -8411,10 +8463,35 @@ dependencies = [ "tempfile", "thiserror", "unsigned-varint", - "void", "zeroize", ] +[[package]] +name = "sc-network-bitswap" +version = "0.10.0-dev" +dependencies = [ + "cid", + "futures", + "libp2p", + "log", + "prost 0.11.0", + "prost-build 0.11.1", + "sc-block-builder", + "sc-client-api", + "sc-consensus", + "sc-network-common", + "sp-blockchain", + "sp-consensus", + "sp-core", + "sp-runtime", + "substrate-test-runtime", + "substrate-test-runtime-client", + "thiserror", + "tokio", + "unsigned-varint", + "void", +] + [[package]] name = "sc-network-common" version = "0.10.0-dev" @@ -8425,7 +8502,7 @@ dependencies = [ "futures", "libp2p", "parity-scale-codec", - "prost-build", + "prost-build 0.10.4", "sc-consensus", "sc-peerset", "serde", @@ -8466,8 +8543,8 @@ dependencies = [ "libp2p", "log", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "sc-client-api", "sc-network-common", "sc-peerset", @@ -8488,8 +8565,8 @@ dependencies = [ "log", "lru", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.10.3", + "prost-build 0.10.4", "quickcheck", "sc-block-builder", "sc-client-api", @@ -8712,6 +8789,7 @@ dependencies = [ "sc-informant", "sc-keystore", "sc-network", + "sc-network-bitswap", "sc-network-common", "sc-network-light", "sc-network-sync", diff --git a/Cargo.toml b/Cargo.toml index e2907716ca9f2..9c71dbdf4b52c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "client/keystore", "client/network", "client/network-gossip", + "client/network/bitswap", "client/network/common", "client/network/light", "client/network/sync", diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 82f77fa44450f..ae115220d6843 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -13,9 +13,6 @@ readme = "README.md" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] -[build-dependencies] -prost-build = "0.10" - [dependencies] async-trait = "0.1" asynchronous-codec = "0.6" @@ -43,7 +40,6 @@ serde_json = "1.0.85" smallvec = "1.8.0" thiserror = "1.0" unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] } -void = "1.0.2" zeroize = "1.4.3" fork-tree = { version = "3.0.0", path = "../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } diff --git a/client/network/bitswap/Cargo.toml b/client/network/bitswap/Cargo.toml new file mode 100644 index 0000000000000..9fa34aab17dfb --- /dev/null +++ b/client/network/bitswap/Cargo.toml @@ -0,0 +1,40 @@ +[package] +description = "Substrate bitswap protocol" +name = "sc-network-bitswap" +version = "0.10.0-dev" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +authors = ["Parity Technologies "] +edition = "2021" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +documentation = "https://docs.rs/sc-network-bitswap" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +prost-build = "0.11" + +[dependencies] +cid = "0.8.6" +futures = "0.3.21" +libp2p = "0.46.1" +log = "0.4.17" +prost = "0.11" +thiserror = "1.0" +unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] } +void = "1.0.2" +sc-client-api = { version = "4.0.0-dev", path = "../../api" } +sc-network-common = { version = "0.10.0-dev", path = "../common" } +sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } +sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } +sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" } +sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } +sp-core = { version = "6.0.0", path = "../../../primitives/core" } +sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } +substrate-test-runtime = { version = "2.0.0", path = "../../../test-utils/runtime" } +substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" } diff --git a/client/network/build.rs b/client/network/bitswap/build.rs similarity index 56% rename from client/network/build.rs rename to client/network/bitswap/build.rs index 671277230a774..bd6222d546851 100644 --- a/client/network/build.rs +++ b/client/network/bitswap/build.rs @@ -1,4 +1,4 @@ -const PROTOS: &[&str] = &["src/schema/bitswap.v1.2.0.proto"]; +const PROTOS: &[&str] = &["bitswap.v1.2.0.proto"]; fn main() { prost_build::compile_protos(PROTOS, &["src/schema"]).unwrap(); diff --git a/client/network/bitswap/src/lib.rs b/client/network/bitswap/src/lib.rs new file mode 100644 index 0000000000000..aba7f40ce632f --- /dev/null +++ b/client/network/bitswap/src/lib.rs @@ -0,0 +1,524 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Bitswap server for Substrate. +//! +//! Allows querying transactions by hash over standard bitswap protocol +//! Only supports bitswap 1.2.0. +//! CID is expected to reference 256-bit Blake2b transaction hash. + +use cid::{self, Version}; +use futures::{channel::mpsc, StreamExt}; +use libp2p::core::PeerId; +use log::{debug, error, trace}; +use prost::Message; +use sc_client_api::BlockBackend; +use sc_network_common::{ + protocol::ProtocolName, + request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, +}; +use schema::bitswap::{ + message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType}, + Message as BitswapMessage, +}; +use sp_runtime::traits::Block as BlockT; +use std::{io, sync::Arc, time::Duration}; +use unsigned_varint::encode as varint_encode; + +mod schema; + +const LOG_TARGET: &str = "bitswap"; + +// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes +// https://github.com/ipfs/js-ipfs-bitswap/blob/ +// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 +// We set it to the same value as max substrate protocol message +const MAX_PACKET_SIZE: u64 = 16 * 1024 * 1024; + +/// Max number of queued responses before denying requests. +const MAX_REQUEST_QUEUE: usize = 20; + +/// Max number of blocks per wantlist +const MAX_WANTED_BLOCKS: usize = 16; + +/// Bitswap protocol name +const PROTOCOL_NAME: &'static str = "/ipfs/bitswap/1.2.0"; + +/// Prefix represents all metadata of a CID, without the actual content. +#[derive(PartialEq, Eq, Clone, Debug)] +struct Prefix { + /// The version of CID. + pub version: Version, + /// The codec of CID. + pub codec: u64, + /// The multihash type of CID. + pub mh_type: u64, + /// The multihash length of CID. + pub mh_len: u8, +} + +impl Prefix { + /// Convert the prefix to encoded bytes. + pub fn to_bytes(&self) -> Vec { + let mut res = Vec::with_capacity(4); + let mut buf = varint_encode::u64_buffer(); + let version = varint_encode::u64(self.version.into(), &mut buf); + res.extend_from_slice(version); + let mut buf = varint_encode::u64_buffer(); + let codec = varint_encode::u64(self.codec, &mut buf); + res.extend_from_slice(codec); + let mut buf = varint_encode::u64_buffer(); + let mh_type = varint_encode::u64(self.mh_type, &mut buf); + res.extend_from_slice(mh_type); + let mut buf = varint_encode::u64_buffer(); + let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf); + res.extend_from_slice(mh_len); + res + } +} + +/// Bitswap request handler +pub struct BitswapRequestHandler { + client: Arc + Send + Sync>, + request_receiver: mpsc::Receiver, +} + +impl BitswapRequestHandler { + /// Create a new [`BitswapRequestHandler`]. + pub fn new(client: Arc + Send + Sync>) -> (Self, ProtocolConfig) { + let (tx, request_receiver) = mpsc::channel(MAX_REQUEST_QUEUE); + + let config = ProtocolConfig { + name: ProtocolName::from(PROTOCOL_NAME), + fallback_names: vec![], + max_request_size: MAX_PACKET_SIZE, + max_response_size: MAX_PACKET_SIZE, + request_timeout: Duration::from_secs(15), + inbound_queue: Some(tx), + }; + + (Self { client, request_receiver }, config) + } + + /// Run [`BitswapRequestHandler`]. + pub async fn run(mut self) { + while let Some(request) = self.request_receiver.next().await { + let IncomingRequest { peer, payload, pending_response } = request; + + match self.handle_message(&peer, &payload) { + Ok(response) => { + let response = OutgoingResponse { + result: Ok(response), + reputation_changes: Vec::new(), + sent_feedback: None, + }; + + match pending_response.send(response) { + Ok(()) => + trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",), + Err(_) => debug!( + target: LOG_TARGET, + "Failed to handle light client request from {peer}: {}", + BitswapError::SendResponse, + ), + } + }, + Err(err) => { + error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}"); + + // TODO: adjust reputation? + + let response = OutgoingResponse { + result: Err(()), + reputation_changes: vec![], + sent_feedback: None, + }; + + if pending_response.send(response).is_err() { + debug!( + target: LOG_TARGET, + "Failed to handle bitswap request from {peer}: {}", + BitswapError::SendResponse, + ); + } + }, + } + } + } + + /// Handle received Bitswap request + fn handle_message( + &mut self, + peer: &PeerId, + payload: &Vec, + ) -> Result, BitswapError> { + let request = schema::bitswap::Message::decode(&payload[..])?; + + trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); + + let mut response = BitswapMessage::default(); + + let wantlist = match request.wantlist { + Some(wantlist) => wantlist, + None => { + debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer); + return Err(BitswapError::InvalidWantList) + }, + }; + + if wantlist.entries.len() > MAX_WANTED_BLOCKS { + trace!(target: LOG_TARGET, "Ignored request: too many entries"); + return Err(BitswapError::TooManyEntries) + } + + for entry in wantlist.entries { + let cid = match cid::Cid::read_bytes(entry.block.as_slice()) { + Ok(cid) => cid, + Err(e) => { + trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e); + continue + }, + }; + + if cid.version() != cid::Version::V1 || + cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) || + cid.hash().size() != 32 + { + debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid); + continue + } + + let mut hash = B::Hash::default(); + hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]); + let transaction = match self.client.indexed_transaction(&hash) { + Ok(ex) => ex, + Err(e) => { + error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e); + None + }, + }; + + match transaction { + Some(transaction) => { + trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash); + + if entry.want_type == WantType::Block as i32 { + let prefix = Prefix { + version: cid.version(), + codec: cid.codec(), + mh_type: cid.hash().code(), + mh_len: cid.hash().size(), + }; + response + .payload + .push(MessageBlock { prefix: prefix.to_bytes(), data: transaction }); + } else { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::Have as i32, + cid: cid.to_bytes(), + }); + } + }, + None => { + trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash); + + if entry.send_dont_have { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::DontHave as i32, + cid: cid.to_bytes(), + }); + } + }, + } + } + + Ok(response.encode_to_vec()) + } +} + +/// Bitswap protocol error. +#[derive(Debug, thiserror::Error)] +pub enum BitswapError { + /// Protobuf decoding error. + #[error("Failed to decode request: {0}.")] + DecodeProto(#[from] prost::DecodeError), + + /// Protobuf encoding error. + #[error("Failed to encode response: {0}.")] + EncodeProto(#[from] prost::EncodeError), + + /// Client backend error. + #[error(transparent)] + Client(#[from] sp_blockchain::Error), + + /// Error parsing CID + #[error(transparent)] + BadCid(#[from] cid::Error), + + /// Packet read error. + #[error(transparent)] + Read(#[from] io::Error), + + /// Error sending response. + #[error("Failed to send response.")] + SendResponse, + + /// Message doesn't have a WANT list. + #[error("Invalid WANT list.")] + InvalidWantList, + + /// Too many blocks requested. + #[error("Too many block entries in the request.")] + TooManyEntries, +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{channel::oneshot, SinkExt}; + use sc_block_builder::BlockBuilderProvider; + use schema::bitswap::{ + message::{wantlist::Entry, Wantlist}, + Message as BitswapMessage, + }; + use sp_consensus::BlockOrigin; + use sp_runtime::codec::Encode; + use substrate_test_runtime::Extrinsic; + use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder}; + + #[tokio::test] + async fn undecodeable_message() { + let client = substrate_test_runtime_client::new(); + let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client)); + + tokio::spawn(async move { bitswap.run().await }); + + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: vec![0x13, 0x37, 0x13, 0x38], + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(result, Err(())); + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + } else { + panic!("invalid event received"); + } + } + + #[tokio::test] + async fn empty_want_list() { + let client = substrate_test_runtime_client::new(); + let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client)); + + tokio::spawn(async move { bitswap.run().await }); + + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .as_mut() + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(), + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(result, Err(())); + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + } else { + panic!("invalid event received"); + } + + // Empty WANT list should not cause an error + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: BitswapMessage { + wantlist: Some(Default::default()), + ..Default::default() + } + .encode_to_vec(), + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec())); + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + } else { + panic!("invalid event received"); + } + } + + #[tokio::test] + async fn too_long_want_list() { + let client = substrate_test_runtime_client::new(); + let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client)); + + tokio::spawn(async move { bitswap.run().await }); + + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: BitswapMessage { + wantlist: Some(Wantlist { + entries: (0..MAX_WANTED_BLOCKS + 1) + .map(|_| Entry::default()) + .collect::>(), + full: false, + }), + ..Default::default() + } + .encode_to_vec(), + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(result, Err(())); + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + } else { + panic!("invalid event received"); + } + } + + #[tokio::test] + async fn transaction_not_found() { + let client = TestClientBuilder::with_tx_storage(u32::MAX).build(); + + let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client)); + tokio::spawn(async move { bitswap.run().await }); + + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: BitswapMessage { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid::Cid::new_v1( + 0x70, + cid::multihash::Multihash::wrap( + u64::from(cid::multihash::Code::Blake2b256), + &[0u8; 32], + ) + .unwrap(), + ) + .to_bytes(), + ..Default::default() + }], + full: false, + }), + ..Default::default() + } + .encode_to_vec(), + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(result, Ok(vec![])); + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + } else { + panic!("invalid event received"); + } + } + + #[tokio::test] + async fn transaction_found() { + let mut client = TestClientBuilder::with_tx_storage(u32::MAX).build(); + let mut block_builder = client.new_block(Default::default()).unwrap(); + + let ext = Extrinsic::Store(vec![0x13, 0x37, 0x13, 0x38]); + + block_builder.push(ext.clone()).unwrap(); + let block = block_builder.build().unwrap().block; + + client.import(BlockOrigin::File, block).await.unwrap(); + + let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client)); + + tokio::spawn(async move { bitswap.run().await }); + + let (tx, rx) = oneshot::channel(); + config + .inbound_queue + .unwrap() + .send(IncomingRequest { + peer: PeerId::random(), + payload: BitswapMessage { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid::Cid::new_v1( + 0x70, + cid::multihash::Multihash::wrap( + u64::from(cid::multihash::Code::Blake2b256), + &sp_core::hashing::blake2_256(&ext.encode()[2..]), + ) + .unwrap(), + ) + .to_bytes(), + ..Default::default() + }], + full: false, + }), + ..Default::default() + } + .encode_to_vec(), + pending_response: tx, + }) + .await + .unwrap(); + + if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await { + assert_eq!(reputation_changes, Vec::new()); + assert!(sent_feedback.is_none()); + + let response = + schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap(); + assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]); + } else { + panic!("invalid event received"); + } + } +} diff --git a/client/network/src/schema.rs b/client/network/bitswap/src/schema.rs similarity index 97% rename from client/network/src/schema.rs rename to client/network/bitswap/src/schema.rs index 4893bc28a7355..362e59aca68f9 100644 --- a/client/network/src/schema.rs +++ b/client/network/bitswap/src/schema.rs @@ -18,6 +18,6 @@ //! Include sources generated from protobuf definitions. -pub mod bitswap { +pub(crate) mod bitswap { include!(concat!(env!("OUT_DIR"), "/bitswap.message.rs")); } diff --git a/client/network/src/schema/bitswap.v1.2.0.proto b/client/network/bitswap/src/schema/bitswap.v1.2.0.proto similarity index 100% rename from client/network/src/schema/bitswap.v1.2.0.proto rename to client/network/bitswap/src/schema/bitswap.v1.2.0.proto diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 24df47db6803f..88571647fb0e3 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -17,7 +17,6 @@ // along with this program. If not, see . use crate::{ - bitswap::Bitswap, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, peer_info, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, @@ -32,8 +31,7 @@ use libp2p::{ identify::IdentifyInfo, kad::record, swarm::{ - behaviour::toggle::Toggle, NetworkBehaviour, NetworkBehaviourAction, - NetworkBehaviourEventProcess, PollParameters, + NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, }, NetworkBehaviour, }; @@ -79,8 +77,6 @@ where peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, - /// Bitswap server for blockchain data. - bitswap: Toggle>, /// Generic request-response protocols. request_responses: request_responses::RequestResponsesBehaviour, @@ -214,7 +210,6 @@ where block_request_protocol_config: ProtocolConfig, state_request_protocol_config: ProtocolConfig, warp_sync_protocol_config: Option, - bitswap: Option>, light_client_request_protocol_config: ProtocolConfig, // All remaining request protocol configs. mut request_response_protocols: Vec, @@ -239,7 +234,6 @@ where substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), - bitswap: bitswap.into(), request_responses: request_responses::RequestResponsesBehaviour::new( request_response_protocols.into_iter(), peerset, @@ -338,16 +332,6 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole { } } -impl NetworkBehaviourEventProcess for Behaviour -where - B: BlockT, - Client: HeaderBackend + 'static, -{ - fn inject_event(&mut self, event: void::Void) { - void::unreachable(event) - } -} - impl NetworkBehaviourEventProcess> for Behaviour where B: BlockT, diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs deleted file mode 100644 index 52fa0c36caedf..0000000000000 --- a/client/network/src/bitswap.rs +++ /dev/null @@ -1,416 +0,0 @@ -// Copyright 2022 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Bitswap server for substrate. -//! -//! Allows querying transactions by hash over standard bitswap protocol -//! Only supports bitswap 1.2.0. -//! CID is expected to reference 256-bit Blake2b transaction hash. - -use crate::schema::bitswap::{ - message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType}, - Message as BitswapMessage, -}; -use cid::Version; -use core::pin::Pin; -use futures::{ - io::{AsyncRead, AsyncWrite}, - Future, -}; -use libp2p::{ - core::{ - connection::ConnectionId, upgrade, InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, - UpgradeInfo, - }, - swarm::{ - NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, PollParameters, - }, -}; -use log::{debug, error, trace}; -use prost::Message; -use sc_client_api::BlockBackend; -use sp_runtime::traits::Block as BlockT; -use std::{ - collections::VecDeque, - io, - marker::PhantomData, - sync::Arc, - task::{Context, Poll}, -}; -use unsigned_varint::encode as varint_encode; - -const LOG_TARGET: &str = "bitswap"; - -// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes -// https://github.com/ipfs/js-ipfs-bitswap/blob/ -// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 -// We set it to the same value as max substrate protocol message -const MAX_PACKET_SIZE: usize = 16 * 1024 * 1024; - -// Max number of queued responses before denying requests. -const MAX_RESPONSE_QUEUE: usize = 20; -// Max number of blocks per wantlist -const MAX_WANTED_BLOCKS: usize = 16; - -const PROTOCOL_NAME: &[u8] = b"/ipfs/bitswap/1.2.0"; - -type FutureResult = Pin> + Send>>; - -/// Bitswap protocol config -#[derive(Clone, Copy, Debug, Default)] -pub struct BitswapConfig; - -impl UpgradeInfo for BitswapConfig { - type Info = &'static [u8]; - type InfoIter = std::iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(PROTOCOL_NAME) - } -} - -impl InboundUpgrade for BitswapConfig -where - TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ - type Output = BitswapMessage; - type Error = BitswapError; - type Future = FutureResult; - - fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { - Box::pin(async move { - let packet = upgrade::read_length_prefixed(&mut socket, MAX_PACKET_SIZE).await?; - let message: BitswapMessage = Message::decode(packet.as_slice())?; - Ok(message) - }) - } -} - -impl UpgradeInfo for BitswapMessage { - type Info = &'static [u8]; - type InfoIter = std::iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(PROTOCOL_NAME) - } -} - -impl OutboundUpgrade for BitswapMessage -where - TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ - type Output = (); - type Error = io::Error; - type Future = FutureResult; - - fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { - Box::pin(async move { - let data = self.encode_to_vec(); - upgrade::write_length_prefixed(&mut socket, data).await - }) - } -} - -/// Internal protocol handler event. -#[derive(Debug)] -pub enum HandlerEvent { - /// We received a `BitswapMessage` from a remote. - Request(BitswapMessage), - /// We successfully sent a `BitswapMessage`. - ResponseSent, -} - -impl From for HandlerEvent { - fn from(message: BitswapMessage) -> Self { - Self::Request(message) - } -} - -impl From<()> for HandlerEvent { - fn from(_: ()) -> Self { - Self::ResponseSent - } -} - -/// Prefix represents all metadata of a CID, without the actual content. -#[derive(PartialEq, Eq, Clone, Debug)] -struct Prefix { - /// The version of CID. - pub version: Version, - /// The codec of CID. - pub codec: u64, - /// The multihash type of CID. - pub mh_type: u64, - /// The multihash length of CID. - pub mh_len: u8, -} - -impl Prefix { - /// Convert the prefix to encoded bytes. - pub fn to_bytes(&self) -> Vec { - let mut res = Vec::with_capacity(4); - let mut buf = varint_encode::u64_buffer(); - let version = varint_encode::u64(self.version.into(), &mut buf); - res.extend_from_slice(version); - let mut buf = varint_encode::u64_buffer(); - let codec = varint_encode::u64(self.codec, &mut buf); - res.extend_from_slice(codec); - let mut buf = varint_encode::u64_buffer(); - let mh_type = varint_encode::u64(self.mh_type, &mut buf); - res.extend_from_slice(mh_type); - let mut buf = varint_encode::u64_buffer(); - let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf); - res.extend_from_slice(mh_len); - res - } -} - -/// Bitswap trait -pub trait BitswapT { - /// Get single indexed transaction by content hash. - /// - /// Note that this will only fetch transactions - /// that are indexed by the runtime with `storage_index_transaction`. - fn indexed_transaction( - &self, - hash: ::Hash, - ) -> sp_blockchain::Result>>; - - /// Queue of blocks ready to be sent out on `poll()` - fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)>; -} - -/// Network behaviour that handles sending and receiving IPFS blocks. -struct BitswapInternal { - client: Arc, - ready_blocks: VecDeque<(PeerId, BitswapMessage)>, - _block: PhantomData, -} - -impl BitswapInternal { - /// Create a new instance of the bitswap protocol handler. - pub fn new(client: Arc) -> Self { - Self { client, ready_blocks: Default::default(), _block: PhantomData::default() } - } -} - -impl BitswapT for BitswapInternal -where - Block: BlockT, - Client: BlockBackend, -{ - fn indexed_transaction( - &self, - hash: ::Hash, - ) -> sp_blockchain::Result>> { - self.client.indexed_transaction(&hash) - } - - fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> { - &mut self.ready_blocks - } -} - -/// Wrapper for bitswap trait object implement NetworkBehaviour -pub struct Bitswap { - inner: Box + Sync + Send>, -} - -impl Bitswap { - /// Create new Bitswap wrapper - pub fn from_client + Send + Sync + 'static>( - client: Arc, - ) -> Self { - let inner = Box::new(BitswapInternal::new(client)) as Box<_>; - Self { inner } - } -} - -impl BitswapT for Bitswap { - fn indexed_transaction( - &self, - hash: ::Hash, - ) -> sp_blockchain::Result>> { - self.inner.indexed_transaction(hash) - } - - fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> { - self.inner.ready_blocks() - } -} - -impl BitswapT for Box -where - T: BitswapT, -{ - fn indexed_transaction( - &self, - hash: ::Hash, - ) -> sp_blockchain::Result>> { - T::indexed_transaction(self, hash) - } - - fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> { - T::ready_blocks(self) - } -} - -impl NetworkBehaviour for Bitswap -where - B: BlockT, -{ - type ConnectionHandler = OneShotHandler; - type OutEvent = void::Void; - - fn new_handler(&mut self) -> Self::ConnectionHandler { - Default::default() - } - - fn addresses_of_peer(&mut self, _peer: &PeerId) -> Vec { - Vec::new() - } - - fn inject_event(&mut self, peer: PeerId, _connection: ConnectionId, message: HandlerEvent) { - let request = match message { - HandlerEvent::ResponseSent => return, - HandlerEvent::Request(msg) => msg, - }; - trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); - if self.ready_blocks().len() > MAX_RESPONSE_QUEUE { - debug!(target: LOG_TARGET, "Ignored request: queue is full"); - return - } - - let mut response = BitswapMessage { - wantlist: None, - blocks: Default::default(), - payload: Default::default(), - block_presences: Default::default(), - pending_bytes: 0, - }; - let wantlist = match request.wantlist { - Some(wantlist) => wantlist, - None => { - debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer); - return - }, - }; - if wantlist.entries.len() > MAX_WANTED_BLOCKS { - trace!(target: LOG_TARGET, "Ignored request: too many entries"); - return - } - for entry in wantlist.entries { - let cid = match cid::Cid::read_bytes(entry.block.as_slice()) { - Ok(cid) => cid, - Err(e) => { - trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e); - continue - }, - }; - if cid.version() != cid::Version::V1 || - cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) || - cid.hash().size() != 32 - { - debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid); - continue - } - let mut hash = B::Hash::default(); - hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]); - let transaction = match self.indexed_transaction(hash) { - Ok(ex) => ex, - Err(e) => { - error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e); - None - }, - }; - match transaction { - Some(transaction) => { - trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash); - if entry.want_type == WantType::Block as i32 { - let prefix = Prefix { - version: cid.version(), - codec: cid.codec(), - mh_type: cid.hash().code(), - mh_len: cid.hash().size(), - }; - response - .payload - .push(MessageBlock { prefix: prefix.to_bytes(), data: transaction }); - } else { - response.block_presences.push(BlockPresence { - r#type: BlockPresenceType::Have as i32, - cid: cid.to_bytes(), - }); - } - }, - None => { - trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash); - if entry.send_dont_have { - response.block_presences.push(BlockPresence { - r#type: BlockPresenceType::DontHave as i32, - cid: cid.to_bytes(), - }); - } - }, - } - } - trace!(target: LOG_TARGET, "Response: {:?}", response); - self.ready_blocks().push_back((peer, response)); - } - - fn poll( - &mut self, - _ctx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some((peer_id, message)) = self.ready_blocks().pop_front() { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: message, - }) - } - Poll::Pending - } -} - -/// Bitswap protocol error. -#[derive(Debug, thiserror::Error)] -pub enum BitswapError { - /// Protobuf decoding error. - #[error("Failed to decode request: {0}.")] - DecodeProto(#[from] prost::DecodeError), - - /// Protobuf encoding error. - #[error("Failed to encode response: {0}.")] - EncodeProto(#[from] prost::EncodeError), - - /// Client backend error. - #[error(transparent)] - Client(#[from] sp_blockchain::Error), - - /// Error parsing CID - #[error(transparent)] - BadCid(#[from] cid::Error), - - /// Packet read error. - #[error(transparent)] - Read(#[from] io::Error), - - /// Error sending response. - #[error("Failed to send response.")] - SendResponse, -} diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 3fe95deb0c1ed..d7ca8b48a7c88 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -31,7 +31,7 @@ pub use sc_network_common::{ pub use libp2p::{build_multiaddr, core::PublicKey, identity}; -use crate::{bitswap::Bitswap, ExHashT}; +use crate::ExHashT; use core::{fmt, iter}; use futures::future; @@ -79,9 +79,6 @@ where /// Client that contains the blockchain. pub chain: Arc, - /// Bitswap block request protocol implementation. - pub bitswap: Option>, - /// Pool of transactions. /// /// The network worker will fetch transactions from this object in order to propagate them on @@ -139,6 +136,9 @@ where /// Optional warp sync protocol config. pub warp_sync_protocol_config: Option, + + /// Request response protocol configurations + pub request_response_protocol_configs: Vec, } /// Role of the local node. diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 35a91e77da524..320104d0f9554 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -249,12 +249,10 @@ mod discovery; mod peer_info; mod protocol; mod request_responses; -mod schema; mod service; mod transport; mod utils; -pub mod bitswap; pub mod config; pub mod error; pub mod network_state; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 4610b025dde0d..fb98db3251647 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -137,7 +137,7 @@ impl NetworkWorker where B: BlockT + 'static, H: ExHashT, - Client: sp_blockchain::HeaderBackend + 'static, + Client: HeaderBackend + 'static, { /// Creates the network service. /// @@ -376,7 +376,6 @@ where params.block_request_protocol_config, params.state_request_protocol_config, params.warp_sync_protocol_config, - params.bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, peerset_handle.clone(), @@ -1743,14 +1742,10 @@ where let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::B(EitherError::A( - PingFailure::Timeout, - ))), + EitherError::B(EitherError::A(PingFailure::Timeout)), )))) => "ping-timeout", Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A( - NotifsHandlerError::SyncNotificationsClogged, - )), + EitherError::A(NotifsHandlerError::SyncNotificationsClogged), )))) => "sync-notifications-clogged", Some(ConnectionError::Handler(_)) => "protocol-error", Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 8770e14bf5338..a9505c5341c3d 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -146,11 +146,11 @@ fn build_test_full_node( import_queue, chain_sync: Box::new(chain_sync), metrics_registry: None, - bitswap: None, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, warp_sync_protocol_config: None, + request_response_protocol_configs: Vec::new(), }) .unwrap(); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 837cdeed0f3d1..5a29e587ceff5 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -883,11 +883,11 @@ where import_queue, chain_sync: Box::new(chain_sync), metrics_registry: None, - bitswap: None, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, warp_sync_protocol_config: Some(warp_protocol_config), + request_response_protocol_configs: Vec::new(), }) .unwrap(); diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 0acdbb1b1b63e..e96421d031d9a 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -51,6 +51,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../../client/consensus/common" sp-inherents = { version = "4.0.0-dev", path = "../../primitives/inherents" } sp-storage = { version = "6.0.0", path = "../../primitives/storage" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-bitswap = { version = "0.10.0-dev", path = "../network/bitswap" } sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network-light = { version = "0.10.0-dev", path = "../network/light" } sc-network-sync = { version = "0.10.0-dev", path = "../network/sync" } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index f03ba6de2866d..5a2f4cf978b41 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -37,7 +37,8 @@ use sc_client_db::{Backend, DatabaseSettings}; use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; -use sc_network::{bitswap::Bitswap, config::SyncMode, NetworkService}; +use sc_network::{config::SyncMode, NetworkService}; +use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{ service::{NetworkStateInfo, NetworkStatusProvider, NetworkTransaction}, sync::warp::WarpSyncProvider, @@ -746,6 +747,8 @@ where warp_sync, } = params; + let mut request_response_protocol_configs = Vec::new(); + if warp_sync.is_none() && config.network.sync_mode.is_warp() { return Err("Warp sync enabled, but no warp sync provider configured.".into()) } @@ -835,6 +838,13 @@ where config.network.max_parallel_downloads, warp_sync_provider, )?; + + request_response_protocol_configs.push(config.network.ipfs_server.then(|| { + let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); + spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run()); + protocol_config + })); + let network_params = sc_network::config::Params { role: config.role.clone(), executor: { @@ -856,12 +866,15 @@ where fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), import_queue: Box::new(import_queue), chain_sync: Box::new(chain_sync), - bitswap: config.network.ipfs_server.then(|| Bitswap::from_client(client.clone())), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, state_request_protocol_config, warp_sync_protocol_config, light_client_request_protocol_config, + request_response_protocol_configs: request_response_protocol_configs + .into_iter() + .flatten() + .collect::>(), }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();