diff --git a/Cargo.lock b/Cargo.lock index d5197ba4ea65b..7340f35ca0d04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,6 +362,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" + [[package]] name = "base58" version = "0.1.0" @@ -464,6 +470,32 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "blake2s_simd" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e461a7034e85b211a4acb57ee2e6730b32912b06c08cc242243c39fc21ae6a2" +dependencies = [ + "arrayref", + "arrayvec 0.5.2", + "constant_time_eq", +] + +[[package]] +name = "blake3" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9ff35b701f3914bdb8fad3368d822c766ef2858b2583198e41639b936f09d3f" +dependencies = [ + "arrayref", + "arrayvec 0.5.2", + "cc", + "cfg-if 0.1.10", + "constant_time_eq", + "crypto-mac 0.8.0", + "digest 0.9.0", +] + [[package]] name = "block-buffer" version = "0.7.3" @@ -706,6 +738,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "cid" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d88f30b1e74e7063df5711496f3ee6e74a9735d62062242d70cddf77717f18e" +dependencies = [ + "multibase", + "multihash", + "unsigned-varint 0.5.1", +] + [[package]] name = "cipher" version = "0.2.5" @@ -1163,6 +1206,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993a608597367c6377b258c25d7120740f00ed23a2252b729b1932dd7866f908" +[[package]] +name = "data-encoding-macro" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a94feec3d2ba66c0b6621bca8bc6f68415b1e5c69af3586fdd0af9fd9f29b17" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f83e699727abca3c56e187945f303389590305ab2f0185ea445aa66e8d5f2a" +dependencies = [ + "data-encoding", + "syn", +] + [[package]] name = "derive_more" version = "0.99.11" @@ -3580,16 +3643,31 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0debeb9fcf88823ea64d64e4a815ab1643f33127d995978e099942ce38f25238" +[[package]] +name = "multibase" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b78c60039650ff12e140ae867ef5299a58e19dded4d334c849dc7177083667e2" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + [[package]] name = "multihash" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb63389ee5fcd4df3f8727600f4a0c3df53c541f0ed4e8b50a9ae51a80fc1efe" dependencies = [ + "blake2b_simd", + "blake2s_simd", + "blake3", "digest 0.9.0", "generic-array 0.14.4", "multihash-derive", "sha2 0.9.2", + "sha3", "unsigned-varint 0.5.1", ] @@ -7105,6 +7183,7 @@ dependencies = [ "bitflags", "bs58", "bytes 1.0.1", + "cid", "derive_more", "either", "erased-serde", diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 4dc2b6bb524e4..990a7908b62bb 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -95,6 +95,17 @@ pub trait BlockBackend { /// Get block hash by number. fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result>; + + /// Get single extrinsic by hash. + fn extrinsic( + &self, + hash: &Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>>; + + /// Check if extrinsic exists. + fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result { + Ok(self.extrinsic(hash)?.is_some()) + } } /// Provide a list of potential uncle headers for a given block. diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index cef52982f167b..c108acc7b43b9 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -386,6 +386,13 @@ impl blockchain::Backend for Blockchain { fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result> { unimplemented!() } + + fn extrinsic( + &self, + _hash: &Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>> { + unimplemented!("Not supported by the in-mem backend.") + } } impl blockchain::ProvideCache for Blockchain { diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 4a926fdce8bbf..f4a6e8d3982ba 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -110,6 +110,10 @@ pub struct NetworkParams { /// security improvements. #[structopt(long)] pub kademlia_disjoint_query_paths: bool, + + /// Join the IPFS network and serve transactions over bitswap protocol. + #[structopt(long)] + pub ipfs_server: bool, } impl NetworkParams { @@ -181,6 +185,7 @@ impl NetworkParams { allow_non_globals_in_dht, kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths, yamux_window_size: None, + ipfs_server: self.ipfs_server, } } } diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index a976cbc2ce8d0..6654083939dae 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -448,20 +448,6 @@ impl BlockchainDb { header.digest().log(DigestItem::as_changes_trie_root) .cloned())) } - - fn extrinsic(&self, hash: &Block::Hash) -> ClientResult> { - match self.db.get(columns::TRANSACTION, hash.as_ref()) { - Some(ex) => { - match Decode::decode(&mut &ex[..]) { - Ok(ex) => Ok(Some(ex)), - Err(err) => Err(sp_blockchain::Error::Backend( - format!("Error decoding extrinsic {}: {}", hash, err) - )), - } - }, - None => Ok(None), - } - } } impl sc_client_api::blockchain::HeaderBackend for BlockchainDb { @@ -532,7 +518,7 @@ impl sc_client_api::blockchain::Backend for BlockchainDb::decode(&mut &body[..]) { Ok(hashes) => { let extrinsics: ClientResult> = hashes.into_iter().map( - |h| self.extrinsic(&h) .and_then(|maybe_ex| maybe_ex.ok_or_else( + |h| self.extrinsic(&h).and_then(|maybe_ex| maybe_ex.ok_or_else( || sp_blockchain::Error::Backend( format!("Missing transaction: {}", h)))) ).collect(); @@ -576,6 +562,24 @@ impl sc_client_api::blockchain::Backend for BlockchainDb ClientResult> { children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash) } + + fn extrinsic(&self, hash: &Block::Hash) -> ClientResult> { + match self.db.get(columns::TRANSACTION, hash.as_ref()) { + Some(ex) => { + match Decode::decode(&mut &ex[..]) { + Ok(ex) => Ok(Some(ex)), + Err(err) => Err(sp_blockchain::Error::Backend( + format!("Error decoding extrinsic {}: {}", hash, err) + )), + } + }, + None => Ok(None), + } + } + + fn have_extrinsic(&self, hash: &Block::Hash) -> ClientResult { + Ok(self.db.contains(columns::TRANSACTION, hash.as_ref())) + } } impl sc_client_api::blockchain::ProvideCache for BlockchainDb { diff --git a/client/db/src/utils.rs b/client/db/src/utils.rs index baea6aab69fa0..cd9b2a6f56d41 100644 --- a/client/db/src/utils.rs +++ b/client/db/src/utils.rs @@ -401,7 +401,13 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< } { let hash = header.hash(); - debug!("DB Opened blockchain db, fetched {} = {:?} ({})", desc, hash, header.number()); + debug!( + target: "db", + "Opened blockchain db, fetched {} = {:?} ({})", + desc, + hash, + header.number() + ); Ok((hash, *header.number())) } else { Ok((genesis_hash.clone(), Zero::zero())) diff --git a/client/light/src/blockchain.rs b/client/light/src/blockchain.rs index f682e6e35b3d0..bcabc365676a5 100644 --- a/client/light/src/blockchain.rs +++ b/client/light/src/blockchain.rs @@ -128,6 +128,13 @@ impl BlockchainBackend for Blockchain where Block: BlockT, S fn children(&self, _parent_hash: Block::Hash) -> ClientResult> { Err(ClientError::NotAvailableOnLightClient) } + + fn extrinsic( + &self, + _hash: &Block::Hash, + ) -> ClientResult::Extrinsic>> { + Err(ClientError::NotAvailableOnLightClient) + } } impl, Block: BlockT> ProvideCache for Blockchain { diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 8c6fc4e668d08..d6cb9bcb0eb8e 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -21,6 +21,7 @@ async-trait = "0.1" async-std = "1.6.5" bitflags = "1.2.0" bs58 = "0.4.0" +cid = "0.6.0" bytes = "1" codec = { package = "parity-scale-codec", version = "2.0.0", features = ["derive"] } derive_more = "0.99.2" diff --git a/client/network/build.rs b/client/network/build.rs index 2ccc72d99df96..0eea622e87574 100644 --- a/client/network/build.rs +++ b/client/network/build.rs @@ -1,6 +1,7 @@ const PROTOS: &[&str] = &[ "src/schema/api.v1.proto", - "src/schema/light.v1.proto" + "src/schema/light.v1.proto", + "src/schema/bitswap.v1.2.0.proto", ]; fn main() { diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index a34f6e0960c47..ffa8bb3c417e1 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -18,6 +18,7 @@ use crate::{ config::{ProtocolId, Role}, + bitswap::Bitswap, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, peer_info, request_responses, light_client_requests, @@ -30,7 +31,9 @@ use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::identify::IdentifyInfo; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::swarm::{ + NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle +}; use log::debug; use prost::Message; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; @@ -58,6 +61,8 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + /// Bitswap server for blockchain data. + bitswap: Toggle>, /// Generic request-reponse protocols. request_responses: request_responses::RequestResponsesBehaviour, @@ -180,6 +185,7 @@ impl Behaviour { light_client_request_sender: light_client_requests::sender::LightClientRequestSender, disco_config: DiscoveryConfig, block_request_protocol_config: request_responses::ProtocolConfig, + bitswap: Option>, light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. mut request_response_protocols: Vec, @@ -194,6 +200,7 @@ impl Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), + bitswap: bitswap.into(), request_responses: request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, light_client_request_sender, @@ -297,6 +304,13 @@ fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Ro } } +impl NetworkBehaviourEventProcess for +Behaviour { + fn inject_event(&mut self, event: void::Void) { + void::unreachable(event) + } +} + impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: CustomMessageOutcome) { diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs new file mode 100644 index 0000000000000..7129f3dbe07b1 --- /dev/null +++ b/client/network/src/bitswap.rs @@ -0,0 +1,338 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Bitswap server for substrate. +//! +//! Allows querying transactions by hash over standard bitswap protocol +//! Only supports bitswap 1.2.0. +//! CID is expected to reference 256-bit Blake2b transaction hash. + +use std::collections::VecDeque; +use std::io; +use std::sync::Arc; +use std::task::{Context, Poll}; +use cid::Version; +use codec::Encode; +use core::pin::Pin; +use futures::Future; +use futures::io::{AsyncRead, AsyncWrite}; +use libp2p::core::{ + connection::ConnectionId, Multiaddr, PeerId, + upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo, +}; +use libp2p::swarm::{ + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ProtocolsHandler, IntoProtocolsHandler, OneShotHandler, +}; +use log::{error, debug, trace}; +use prost::Message; +use sp_runtime::traits::{Block as BlockT}; +use unsigned_varint::{encode as varint_encode}; +use crate::chain::Client; +use crate::schema::bitswap::{ + Message as BitswapMessage, + message::{wantlist::WantType, Block as MessageBlock, BlockPresenceType, BlockPresence}, +}; + +const LOG_TARGET: &str = "bitswap"; + +// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes +// https://github.com/ipfs/js-ipfs-bitswap/blob/ +// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 +// We set it to the same value as max substrate protocol message +const MAX_PACKET_SIZE: usize = 16 * 1024 * 1024; + +// Max number of queued responses before denying requests. +const MAX_RESPONSE_QUEUE: usize = 20; +// Max number of blocks per wantlist +const MAX_WANTED_BLOCKS: usize = 16; + +const PROTOCOL_NAME: &'static [u8] = b"/ipfs/bitswap/1.2.0"; + +type FutureResult = Pin> + Send>>; + +/// Bitswap protocol config +#[derive(Clone, Copy, Debug, Default)] +pub struct BitswapConfig; + +impl UpgradeInfo for BitswapConfig { + type Info = &'static [u8]; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once(PROTOCOL_NAME) + } +} + +impl InboundUpgrade for BitswapConfig +where + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + type Output = BitswapMessage; + type Error = BitswapError; + type Future = FutureResult; + + fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { + Box::pin(async move { + let packet = upgrade::read_one(&mut socket, MAX_PACKET_SIZE).await?; + let message: BitswapMessage = Message::decode(packet.as_slice())?; + Ok(message) + }) + } +} + +impl UpgradeInfo for BitswapMessage { + type Info = &'static [u8]; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once(PROTOCOL_NAME) + } +} + +impl OutboundUpgrade for BitswapMessage +where + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + type Output = (); + type Error = io::Error; + type Future = FutureResult; + + fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { + Box::pin(async move { + let mut data = Vec::with_capacity(self.encoded_len()); + self.encode(&mut data)?; + upgrade::write_one(&mut socket, data).await + }) + } +} + +/// Internal protocol handler event. +#[derive(Debug)] +pub enum HandlerEvent { + /// We received a `BitswapMessage` from a remote. + Request(BitswapMessage), + /// We successfully sent a `BitswapMessage`. + ResponseSent, +} + +impl From for HandlerEvent { + fn from(message: BitswapMessage) -> Self { + Self::Request(message) + } +} + +impl From<()> for HandlerEvent { + fn from(_: ()) -> Self { + Self::ResponseSent + } +} + +/// Prefix represents all metadata of a CID, without the actual content. +#[derive(PartialEq, Eq, Clone, Debug)] +struct Prefix { + /// The version of CID. + pub version: Version, + /// The codec of CID. + pub codec: u64, + /// The multihash type of CID. + pub mh_type: u64, + /// The multihash length of CID. + pub mh_len: u8, +} + +impl Prefix { + /// Convert the prefix to encoded bytes. + pub fn to_bytes(&self) -> Vec { + let mut res = Vec::with_capacity(4); + let mut buf = varint_encode::u64_buffer(); + let version = varint_encode::u64(self.version.into(), &mut buf); + res.extend_from_slice(version); + let mut buf = varint_encode::u64_buffer(); + let codec = varint_encode::u64(self.codec.into(), &mut buf); + res.extend_from_slice(codec); + let mut buf = varint_encode::u64_buffer(); + let mh_type = varint_encode::u64(self.mh_type.into(), &mut buf); + res.extend_from_slice(mh_type); + let mut buf = varint_encode::u64_buffer(); + let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf); + res.extend_from_slice(mh_len); + res + } +} + +/// Network behaviour that handles sending and receiving IPFS blocks. +pub struct Bitswap { + client: Arc>, + ready_blocks: VecDeque<(PeerId, BitswapMessage)>, +} + +impl Bitswap { + /// Create a new instance of the bitswap protocol handler. + pub fn new(client: Arc>) -> Self { + Bitswap { + client, + ready_blocks: Default::default(), + } + } +} + +impl NetworkBehaviour for Bitswap { + type ProtocolsHandler = OneShotHandler; + type OutEvent = void::Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + Default::default() + } + + fn addresses_of_peer(&mut self, _peer: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, _peer: &PeerId) { + } + + fn inject_disconnected(&mut self, _peer: &PeerId) { + } + + fn inject_event(&mut self, peer: PeerId, _connection: ConnectionId, message: HandlerEvent) { + let request = match message { + HandlerEvent::ResponseSent => return, + HandlerEvent::Request(msg) => msg, + }; + trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); + if self.ready_blocks.len() > MAX_RESPONSE_QUEUE { + debug!(target: LOG_TARGET, "Ignored request: queue is full"); + return; + } + let mut response = BitswapMessage { + wantlist: None, + blocks: Default::default(), + payload: Default::default(), + block_presences: Default::default(), + pending_bytes: 0, + }; + let wantlist = match request.wantlist { + Some(wantlist) => wantlist, + None => { + debug!( + target: LOG_TARGET, + "Unexpected bitswap message from {}", + peer, + ); + return; + } + }; + if wantlist.entries.len() > MAX_WANTED_BLOCKS { + trace!(target: LOG_TARGET, "Ignored request: too many entries"); + return; + } + for entry in wantlist.entries { + let cid = match cid::Cid::read_bytes(entry.block.as_slice()) { + Ok(cid) => cid, + Err(e) => { + trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e); + continue; + } + }; + if cid.version() != cid::Version::V1 + || cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) + || cid.hash().size() != 32 + { + debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid); + continue + } + let mut hash = B::Hash::default(); + hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]); + let extrinsic = match self.client.extrinsic(&hash) { + Ok(ex) => ex, + Err(e) => { + error!(target: LOG_TARGET, "Error retrieving extrinsic {}: {}", hash, e); + None + } + }; + match extrinsic { + Some(extrinsic) => { + trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash); + if entry.want_type == WantType::Block as i32 { + let prefix = Prefix { + version: cid.version(), + codec: cid.codec(), + mh_type: cid.hash().code(), + mh_len: cid.hash().size(), + }; + response.payload.push(MessageBlock { + prefix: prefix.to_bytes(), + data: extrinsic.encode(), + }); + } else { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::Have as i32, + cid: cid.to_bytes(), + }); + } + }, + None => { + trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash); + if entry.send_dont_have { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::DontHave as i32, + cid: cid.to_bytes(), + }); + } + } + } + } + trace!(target: LOG_TARGET, "Response: {:?}", response); + self.ready_blocks.push_back((peer, response)); + } + + fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent, + >, + > { + if let Some((peer_id, message)) = self.ready_blocks.pop_front() { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::Any, + event: message, + }) + } + Poll::Pending + } +} + +/// Bitswap protocol error. +#[derive(derive_more::Display, derive_more::From)] +pub enum BitswapError { + /// Protobuf decoding error. + #[display(fmt = "Failed to decode request: {}.", _0)] + DecodeProto(prost::DecodeError), + /// Protobuf encoding error. + #[display(fmt = "Failed to encode response: {}.", _0)] + EncodeProto(prost::EncodeError), + /// Client backend error. + Client(sp_blockchain::Error), + /// Error parsing CID + BadCid(cid::Error), + /// Packet read error. + Read(upgrade::ReadOneError), + /// Error sending response. + #[display(fmt = "Failed to send response.")] + SendResponse, +} diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 29a0128b87ea7..5a2327dda1308 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -420,6 +420,8 @@ pub struct NetworkConfiguration { /// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in /// the presence of potentially adversarial nodes. pub kademlia_disjoint_query_paths: bool, + /// Enable serving block data over IPFS bitswap. + pub ipfs_server: bool, /// Size of Yamux receive window of all substreams. `None` for the default (256kiB). /// Any value less than 256kiB is invalid. @@ -472,6 +474,7 @@ impl NetworkConfiguration { allow_non_globals_in_dht: false, kademlia_disjoint_query_paths: false, yamux_window_size: None, + ipfs_server: false, } } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 007928ad425f7..a64be19d87671 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -258,6 +258,7 @@ mod transport; mod utils; pub mod block_request_handler; +pub mod bitswap; pub mod light_client_requests; pub mod config; pub mod error; diff --git a/client/network/src/schema.rs b/client/network/src/schema.rs index 5b9a70b0cd5d9..d4572fca7594c 100644 --- a/client/network/src/schema.rs +++ b/client/network/src/schema.rs @@ -24,3 +24,7 @@ pub mod v1 { include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs")); } } + +pub mod bitswap { + include!(concat!(env!("OUT_DIR"), "/bitswap.message.rs")); +} diff --git a/client/network/src/schema/bitswap.v1.2.0.proto b/client/network/src/schema/bitswap.v1.2.0.proto new file mode 100644 index 0000000000000..a4138b516d63d --- /dev/null +++ b/client/network/src/schema/bitswap.v1.2.0.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package bitswap.message; + +message Message { + message Wantlist { + enum WantType { + Block = 0; + Have = 1; + } + + message Entry { + bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + int32 priority = 2; // the priority (normalized). default to 1 + bool cancel = 3; // whether this revokes an entry + WantType wantType = 4; // Note: defaults to enum 0, ie Block + bool sendDontHave = 5; // Note: defaults to false + } + + repeated Entry entries = 1; // a list of wantlist entries + bool full = 2; // whether this is the full wantlist. default to false + } + + message Block { + bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) + bytes data = 2; + } + + enum BlockPresenceType { + Have = 0; + DontHave = 1; + } + message BlockPresence { + bytes cid = 1; + BlockPresenceType type = 2; + } + + Wantlist wantlist = 1; + repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 + repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 + repeated BlockPresence blockPresences = 4; + int32 pendingBytes = 5; +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index cb1cc4f3b77a6..d099c90cb594a 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -50,6 +50,7 @@ use crate::{ sync::SyncState, }, transport, ReputationChange, + bitswap::Bitswap, }; use futures::{channel::oneshot, prelude::*}; use libp2p::{PeerId, multiaddr, Multiaddr}; @@ -248,6 +249,7 @@ impl NetworkWorker { let is_major_syncing = Arc::new(AtomicBool::new(false)); // Build the swarm. + let client = params.chain.clone(); let (mut swarm, bandwidth): (Swarm, _) = { let user_agent = format!( "{} ({})", @@ -334,6 +336,7 @@ impl NetworkWorker { }; let behaviour = { + let bitswap = if params.network_config.ipfs_server { Some(Bitswap::new(client)) } else { None }; let result = Behaviour::new( protocol, params.role, @@ -342,6 +345,7 @@ impl NetworkWorker { light_client_request_sender, discovery_config, params.block_request_protocol_config, + bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, ); @@ -1613,11 +1617,11 @@ impl Future for NetworkWorker { let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::B(EitherError::A( - PingFailure::Timeout))))))) => "ping-timeout", + EitherError::A(EitherError::B(EitherError::A( + PingFailure::Timeout)))))))) => "ping-timeout", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A( - NotifsHandlerError::SyncNotificationsClogged)))))) => "sync-notifications-clogged", + EitherError::A(EitherError::A( + NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", None => "actively-closed", diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index f337452e9dc84..8cb0e304cdad5 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1919,6 +1919,14 @@ impl BlockBackend for Client fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result> { self.backend.blockchain().hash(number) } + + fn extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result> { + self.backend.blockchain().extrinsic(hash) + } + + fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result { + self.backend.blockchain().have_extrinsic(hash) + } } impl backend::AuxStore for Client diff --git a/primitives/blockchain/src/backend.rs b/primitives/blockchain/src/backend.rs index b50545b1a20af..b5efcfb02198a 100644 --- a/primitives/blockchain/src/backend.rs +++ b/primitives/blockchain/src/backend.rs @@ -215,6 +215,17 @@ pub trait Backend: HeaderBackend + HeaderMetadata Result::Extrinsic>>; + + /// Check if extrinsic exists. + fn have_extrinsic(&self, hash: &Block::Hash) -> Result { + Ok(self.extrinsic(hash)?.is_some()) + } } /// Provides access to the optional cache. diff --git a/primitives/database/src/lib.rs b/primitives/database/src/lib.rs index 94fe16ce01db5..7107ea25c02c0 100644 --- a/primitives/database/src/lib.rs +++ b/primitives/database/src/lib.rs @@ -115,6 +115,11 @@ pub trait Database: Send + Sync { /// `key` is not currently in the database. fn get(&self, col: ColumnId, key: &[u8]) -> Option>; + /// Check if the value exists in the database without retrieving it. + fn contains(&self, col: ColumnId, key: &[u8]) -> bool { + self.get(col, key).is_some() + } + /// Call `f` with the value previously stored against `key`. /// /// This may be faster than `get` since it doesn't allocate.