From 47cfe193b8bfbd2f1931124ea8a2972035bbe523 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 18 Jan 2021 11:15:00 +0300 Subject: [PATCH 1/9] IPFS server for transactions --- Cargo.lock | 79 ++++ client/api/src/client.rs | 11 + client/api/src/in_mem.rs | 7 + client/db/src/lib.rs | 34 +- client/light/src/blockchain.rs | 7 + client/network/Cargo.toml | 1 + client/network/build.rs | 3 +- client/network/src/behaviour.rs | 4 + client/network/src/bitswap.rs | 339 ++++++++++++++++++ client/network/src/lib.rs | 1 + client/network/src/schema.rs | 4 + .../network/src/schema/bitswap.v1.2.0.proto | 43 +++ client/network/src/service.rs | 12 +- client/service/src/client/client.rs | 8 + primitives/blockchain/src/backend.rs | 11 + primitives/database/src/lib.rs | 5 + 16 files changed, 549 insertions(+), 20 deletions(-) create mode 100644 client/network/src/bitswap.rs create mode 100644 client/network/src/schema/bitswap.v1.2.0.proto diff --git a/Cargo.lock b/Cargo.lock index 32159c6936666..ca287ec9f3e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,6 +368,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" + [[package]] name = "base58" version = "0.1.0" @@ -468,6 +474,32 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "blake2s_simd" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e461a7034e85b211a4acb57ee2e6730b32912b06c08cc242243c39fc21ae6a2" +dependencies = [ + "arrayref", + "arrayvec 0.5.2", + "constant_time_eq", +] + +[[package]] +name = "blake3" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9ff35b701f3914bdb8fad3368d822c766ef2858b2583198e41639b936f09d3f" +dependencies = [ + "arrayref", + "arrayvec 0.5.2", + "cc", + "cfg-if 0.1.10", + "constant_time_eq", + "crypto-mac 0.8.0", + "digest 0.9.0", +] + [[package]] name = "block-buffer" version = "0.7.3" @@ -710,6 +742,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "cid" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d88f30b1e74e7063df5711496f3ee6e74a9735d62062242d70cddf77717f18e" +dependencies = [ + "multibase", + "multihash", + "unsigned-varint 0.5.1", +] + [[package]] name = "cipher" version = "0.2.5" @@ -1167,6 +1210,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993a608597367c6377b258c25d7120740f00ed23a2252b729b1932dd7866f908" +[[package]] +name = "data-encoding-macro" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a94feec3d2ba66c0b6621bca8bc6f68415b1e5c69af3586fdd0af9fd9f29b17" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f83e699727abca3c56e187945f303389590305ab2f0185ea445aa66e8d5f2a" +dependencies = [ + "data-encoding", + "syn", +] + [[package]] name = "derive_more" version = "0.99.11" @@ -3577,16 +3640,31 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0debeb9fcf88823ea64d64e4a815ab1643f33127d995978e099942ce38f25238" +[[package]] +name = "multibase" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b78c60039650ff12e140ae867ef5299a58e19dded4d334c849dc7177083667e2" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + [[package]] name = "multihash" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb63389ee5fcd4df3f8727600f4a0c3df53c541f0ed4e8b50a9ae51a80fc1efe" dependencies = [ + "blake2b_simd", + "blake2s_simd", + "blake3", "digest 0.9.0", "generic-array 0.14.4", "multihash-derive", "sha2 0.9.2", + "sha3", "unsigned-varint 0.5.1", ] @@ -7161,6 +7239,7 @@ dependencies = [ "bitflags", "bs58", "bytes 1.0.1", + "cid", "derive_more", "either", "erased-serde", diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 4dc2b6bb524e4..990a7908b62bb 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -95,6 +95,17 @@ pub trait BlockBackend { /// Get block hash by number. fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result>; + + /// Get single extrinsic by hash. + fn extrinsic( + &self, + hash: &Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>>; + + /// Check if extrinsic exists. + fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result { + Ok(self.extrinsic(hash)?.is_some()) + } } /// Provide a list of potential uncle headers for a given block. diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index cef52982f167b..b5a1fac83437f 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -386,6 +386,13 @@ impl blockchain::Backend for Blockchain { fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result> { unimplemented!() } + + fn extrinsic( + &self, + _hash: &Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>> { + unimplemented!() + } } impl blockchain::ProvideCache for Blockchain { diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index a976cbc2ce8d0..6654083939dae 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -448,20 +448,6 @@ impl BlockchainDb { header.digest().log(DigestItem::as_changes_trie_root) .cloned())) } - - fn extrinsic(&self, hash: &Block::Hash) -> ClientResult> { - match self.db.get(columns::TRANSACTION, hash.as_ref()) { - Some(ex) => { - match Decode::decode(&mut &ex[..]) { - Ok(ex) => Ok(Some(ex)), - Err(err) => Err(sp_blockchain::Error::Backend( - format!("Error decoding extrinsic {}: {}", hash, err) - )), - } - }, - None => Ok(None), - } - } } impl sc_client_api::blockchain::HeaderBackend for BlockchainDb { @@ -532,7 +518,7 @@ impl sc_client_api::blockchain::Backend for BlockchainDb::decode(&mut &body[..]) { Ok(hashes) => { let extrinsics: ClientResult> = hashes.into_iter().map( - |h| self.extrinsic(&h) .and_then(|maybe_ex| maybe_ex.ok_or_else( + |h| self.extrinsic(&h).and_then(|maybe_ex| maybe_ex.ok_or_else( || sp_blockchain::Error::Backend( format!("Missing transaction: {}", h)))) ).collect(); @@ -576,6 +562,24 @@ impl sc_client_api::blockchain::Backend for BlockchainDb ClientResult> { children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash) } + + fn extrinsic(&self, hash: &Block::Hash) -> ClientResult> { + match self.db.get(columns::TRANSACTION, hash.as_ref()) { + Some(ex) => { + match Decode::decode(&mut &ex[..]) { + Ok(ex) => Ok(Some(ex)), + Err(err) => Err(sp_blockchain::Error::Backend( + format!("Error decoding extrinsic {}: {}", hash, err) + )), + } + }, + None => Ok(None), + } + } + + fn have_extrinsic(&self, hash: &Block::Hash) -> ClientResult { + Ok(self.db.contains(columns::TRANSACTION, hash.as_ref())) + } } impl sc_client_api::blockchain::ProvideCache for BlockchainDb { diff --git a/client/light/src/blockchain.rs b/client/light/src/blockchain.rs index f682e6e35b3d0..bcabc365676a5 100644 --- a/client/light/src/blockchain.rs +++ b/client/light/src/blockchain.rs @@ -128,6 +128,13 @@ impl BlockchainBackend for Blockchain where Block: BlockT, S fn children(&self, _parent_hash: Block::Hash) -> ClientResult> { Err(ClientError::NotAvailableOnLightClient) } + + fn extrinsic( + &self, + _hash: &Block::Hash, + ) -> ClientResult::Extrinsic>> { + Err(ClientError::NotAvailableOnLightClient) + } } impl, Block: BlockT> ProvideCache for Blockchain { diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index bf948ff4dd37d..215db2a0650fa 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -21,6 +21,7 @@ async-trait = "0.1" async-std = "1.6.5" bitflags = "1.2.0" bs58 = "0.4.0" +cid = "0.6.0" bytes = "1" codec = { package = "parity-scale-codec", version = "1.3.6", features = ["derive"] } derive_more = "0.99.2" diff --git a/client/network/build.rs b/client/network/build.rs index 2ccc72d99df96..0eea622e87574 100644 --- a/client/network/build.rs +++ b/client/network/build.rs @@ -1,6 +1,7 @@ const PROTOS: &[&str] = &[ "src/schema/api.v1.proto", - "src/schema/light.v1.proto" + "src/schema/light.v1.proto", + "src/schema/bitswap.v1.2.0.proto", ]; fn main() { diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index de983bd7139d7..b83adc8de3072 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -18,6 +18,7 @@ use crate::{ config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses, + bitswap::Bitswap, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, ObservedRole, DhtEvent, ExHashT, @@ -57,6 +58,7 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + bitswap: Bitswap, /// Generic request-reponse protocols. request_responses: request_responses::RequestResponsesBehaviour, /// Light client request handling. @@ -178,6 +180,7 @@ impl Behaviour { disco_config: DiscoveryConfig, // Block request protocol config. block_request_protocol_config: request_responses::ProtocolConfig, + bitswap: Bitswap, // All remaining request protocol configs. mut request_response_protocols: Vec, ) -> Result { @@ -189,6 +192,7 @@ impl Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), + bitswap, request_responses: request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, light_client_handler, diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs new file mode 100644 index 0000000000000..940e73c64860a --- /dev/null +++ b/client/network/src/bitswap.rs @@ -0,0 +1,339 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Bitswap server for substrate. +//! +//! Allows querying transactions by hash over standard bitswap protocol +//! Only supports bitswap 1.2.0. +//! + + +use std::collections::VecDeque; +use std::io; +use std::sync::Arc; +use std::task::{Context, Poll}; +use cid::{self, Version}; +use codec::Encode; +use core::pin::Pin; +use futures::Future; +use futures::io::{AsyncRead, AsyncWrite}; +use libp2p::core::{ + connection::ConnectionId, Multiaddr, PeerId, + upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo, +}; +use libp2p::swarm::{ + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ProtocolsHandler, IntoProtocolsHandler, OneShotHandler, +}; +use log::{error, debug, trace}; +use prost::Message; +use sp_runtime::traits::{Block as BlockT}; +use unsigned_varint::{encode as varint_encode}; +use crate::chain::Client; +use crate::schema::bitswap::{ + Message as BitswapMessage, + message::{wantlist::WantType, Block as MessageBlock, BlockPresenceType, BlockPresence}, +}; + +const LOG_TARGET: &str = "bitswap"; + +// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes +// https://github.com/ipfs/js-ipfs-bitswap/blob/d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 +// We set it to the same value as max substrate protocol message +const MAX_PACKET_SIZE: usize = 16 * 1024 * 1024; + +// Max number of queued responses before denying requests. +const MAX_RESPONSE_QUEUE: usize = 20; +// Max number of blocks per wantlist +const MAX_WANTED_BLOCKS: usize = 16; + +const PROTOCOL_NAME: &'static [u8] = b"/ipfs/bitswap/1.2.0"; + +type FutureResult = Pin> + Send>>; + +/// Bitswap protocol config +#[derive(Clone, Copy, Debug, Default)] +pub struct BitswapConfig; + +impl UpgradeInfo for BitswapConfig { + type Info = &'static [u8]; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once(PROTOCOL_NAME) + } +} + +impl InboundUpgrade for BitswapConfig +where + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + type Output = BitswapMessage; + type Error = BitswapError; + type Future = FutureResult; + + fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { + Box::pin(async move { + let packet = upgrade::read_one(&mut socket, MAX_PACKET_SIZE).await?; + let message: BitswapMessage = Message::decode(packet.as_slice())?; + Ok(message) + }) + } +} + +impl UpgradeInfo for BitswapMessage { + type Info = &'static [u8]; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once(PROTOCOL_NAME) + } +} + +impl OutboundUpgrade for BitswapMessage +where + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + type Output = (); + type Error = io::Error; + type Future = FutureResult; + + fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { + Box::pin(async move { + let mut data = Vec::with_capacity(self.encoded_len()); + self.encode(&mut data)?; + upgrade::write_one(&mut socket, data).await + }) + } +} + +/// Internal protocol handler event. +#[derive(Debug)] +pub enum HandlerEvent { + /// We received a `BitswapMessage` from a remote. + Request(BitswapMessage), + /// We successfully sent a `BitswapMessage`. + ResponseSent, +} + +impl From for HandlerEvent { + fn from(message: BitswapMessage) -> Self { + Self::Request(message) + } +} + +impl From<()> for HandlerEvent { + fn from(_: ()) -> Self { + Self::ResponseSent + } +} + +/// Prefix represents all metadata of a CID, without the actual content. +#[derive(PartialEq, Eq, Clone, Debug)] +struct Prefix { + /// The version of CID. + pub version: Version, + /// The codec of CID. + pub codec: u64, + /// The multihash type of CID. + pub mh_type: u64, + /// The multihash length of CID. + pub mh_len: u8, +} + +impl Prefix { + /// Convert the prefix to encoded bytes. + pub fn to_bytes(&self) -> Vec { + let mut res = Vec::with_capacity(4); + + let mut buf = varint_encode::u64_buffer(); + let version = varint_encode::u64(self.version.into(), &mut buf); + res.extend_from_slice(version); + let mut buf = varint_encode::u64_buffer(); + let codec = varint_encode::u64(self.codec.into(), &mut buf); + res.extend_from_slice(codec); + let mut buf = varint_encode::u64_buffer(); + let mh_type = varint_encode::u64(self.mh_type.into(), &mut buf); + res.extend_from_slice(mh_type); + let mut buf = varint_encode::u64_buffer(); + let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf); + res.extend_from_slice(mh_len); + + res + } +} + +/// Network behaviour that handles sending and receiving IPFS blocks. +pub struct Bitswap { + client: Arc>, + ready_blocks: VecDeque<(PeerId, BitswapMessage)>, +} + +impl Bitswap { + /// Create a new instance of the bitswap protocol handler. + pub fn new(client: Arc>) -> Self { + Bitswap { + client, + ready_blocks: Default::default(), + } + } +} + +impl NetworkBehaviour for Bitswap { + type ProtocolsHandler = OneShotHandler; + type OutEvent = void::Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + Default::default() + } + + fn addresses_of_peer(&mut self, _peer: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, _peer: &PeerId) { + } + + fn inject_disconnected(&mut self, _peer: &PeerId) { + } + + fn inject_event(&mut self, peer: PeerId, _connection: ConnectionId, message: HandlerEvent) { + let request = match message { + HandlerEvent::ResponseSent => return, + HandlerEvent::Request(msg) => msg, + }; + log::trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); + if self.ready_blocks.len() > MAX_RESPONSE_QUEUE { + debug!(target: LOG_TARGET, "Ignored request: queue is full"); + return; + } + let mut response = BitswapMessage { + wantlist: None, + blocks: Default::default(), + payload: Default::default(), + block_presences: Default::default(), + pending_bytes: 0, + }; + let wantlist = match request.wantlist { + Some(wantlist) => wantlist, + None => { + debug!( + target: LOG_TARGET, + "Unexpected bitswap message from {}", + peer, + ); + return; + } + }; + if wantlist.entries.len() > MAX_WANTED_BLOCKS { + trace!(target: LOG_TARGET, "Ignored request: too many entries"); + return; + } + for entry in wantlist.entries { + let cid = match cid::Cid::read_bytes(entry.block.as_slice()) { + Ok(cid) => cid, + Err(e) => { + trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e); + continue; + } + }; + if cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) + || cid.hash().size() != 32 + { + debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid); + continue + } + let mut hash = B::Hash::default(); + hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]); + let extrinsic = match self.client.extrinsic(&hash) { + Ok(ex) => ex, + Err(e) => { + error!(target: LOG_TARGET, "Error retrieving extrinsic {}: {}", hash, e); + None + } + }; + match extrinsic { + Some(extrinsic) => { + trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash); + if entry.want_type == WantType::Block as i32 { + let prefix = Prefix { + version: cid.version(), + codec: cid.codec(), + mh_type: cid.hash().code(), + mh_len: cid.hash().size(), + }; + response.payload.push(MessageBlock { + prefix: prefix.to_bytes(), + data: extrinsic.encode(), + }); + } else { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::Have as i32, + cid: cid.to_bytes(), + }); + } + }, + None => { + trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash); + if entry.send_dont_have { + response.block_presences.push(BlockPresence { + r#type: BlockPresenceType::DontHave as i32, + cid: cid.to_bytes(), + }); + } + } + } + } + trace!(target: LOG_TARGET, "Response: {:?}", response); + self.ready_blocks.push_back((peer, response)); + } + + fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent, + >, + > { + if let Some((peer_id, message)) = self.ready_blocks.pop_front() { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::Any, + event: message, + }) + } + Poll::Pending + } +} + +/// Bitswap protocol error. +#[derive(derive_more::Display, derive_more::From)] +pub enum BitswapError { + /// Protobuf decoding error. + #[display(fmt = "Failed to decode request: {}.", _0)] + DecodeProto(prost::DecodeError), + /// Protobuf encoding error. + #[display(fmt = "Failed to encode response: {}.", _0)] + EncodeProto(prost::EncodeError), + /// Client backend error. + Client(sp_blockchain::Error), + /// Error parsing CID + BadCid(cid::Error), + /// Packet read error. + Read(upgrade::ReadOneError), + /// Error sending response. + #[display(fmt = "Failed to send response.")] + SendResponse, +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index ab7625ff9fe8a..789796a9fe4de 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -259,6 +259,7 @@ mod transport; mod utils; pub mod block_request_handler; +pub mod bitswap; pub mod config; pub mod error; pub mod gossip; diff --git a/client/network/src/schema.rs b/client/network/src/schema.rs index 5b9a70b0cd5d9..d4572fca7594c 100644 --- a/client/network/src/schema.rs +++ b/client/network/src/schema.rs @@ -24,3 +24,7 @@ pub mod v1 { include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs")); } } + +pub mod bitswap { + include!(concat!(env!("OUT_DIR"), "/bitswap.message.rs")); +} diff --git a/client/network/src/schema/bitswap.v1.2.0.proto b/client/network/src/schema/bitswap.v1.2.0.proto new file mode 100644 index 0000000000000..a4138b516d63d --- /dev/null +++ b/client/network/src/schema/bitswap.v1.2.0.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package bitswap.message; + +message Message { + message Wantlist { + enum WantType { + Block = 0; + Have = 1; + } + + message Entry { + bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + int32 priority = 2; // the priority (normalized). default to 1 + bool cancel = 3; // whether this revokes an entry + WantType wantType = 4; // Note: defaults to enum 0, ie Block + bool sendDontHave = 5; // Note: defaults to false + } + + repeated Entry entries = 1; // a list of wantlist entries + bool full = 2; // whether this is the full wantlist. default to false + } + + message Block { + bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) + bytes data = 2; + } + + enum BlockPresenceType { + Have = 0; + DontHave = 1; + } + message BlockPresence { + bytes cid = 1; + BlockPresenceType type = 2; + } + + Wantlist wantlist = 1; + repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 + repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 + repeated BlockPresence blockPresences = 4; + int32 pendingBytes = 5; +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 09acef62e7784..00b6eb3886bc5 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -50,6 +50,7 @@ use crate::{ sync::SyncState, }, transport, ReputationChange, + bitswap::Bitswap, }; use futures::{channel::oneshot, prelude::*}; use libp2p::{PeerId, multiaddr, Multiaddr}; @@ -247,6 +248,7 @@ impl NetworkWorker { let is_major_syncing = Arc::new(AtomicBool::new(false)); // Build the swarm. + let client = params.chain.clone(); let (mut swarm, bandwidth): (Swarm, _) = { let user_agent = format!( "{} ({})", @@ -328,6 +330,7 @@ impl NetworkWorker { }; let behaviour = { + let bitswap = Bitswap::new(client); let result = Behaviour::new( protocol, params.role, @@ -336,6 +339,7 @@ impl NetworkWorker { light_client_handler, discovery_config, params.block_request_protocol_config, + bitswap, params.network_config.request_response_protocols, ); @@ -1602,11 +1606,11 @@ impl Future for NetworkWorker { let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::B(EitherError::A( - PingFailure::Timeout)))))))) => "ping-timeout", + EitherError::A(EitherError::A(EitherError::B(EitherError::A( + PingFailure::Timeout))))))))) => "ping-timeout", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A( - NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged", + EitherError::A(EitherError::A(EitherError::A( + NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", None => "actively-closed", diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index f337452e9dc84..8cb0e304cdad5 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1919,6 +1919,14 @@ impl BlockBackend for Client fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result> { self.backend.blockchain().hash(number) } + + fn extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result> { + self.backend.blockchain().extrinsic(hash) + } + + fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result { + self.backend.blockchain().have_extrinsic(hash) + } } impl backend::AuxStore for Client diff --git a/primitives/blockchain/src/backend.rs b/primitives/blockchain/src/backend.rs index b50545b1a20af..b5efcfb02198a 100644 --- a/primitives/blockchain/src/backend.rs +++ b/primitives/blockchain/src/backend.rs @@ -215,6 +215,17 @@ pub trait Backend: HeaderBackend + HeaderMetadata Result::Extrinsic>>; + + /// Check if extrinsic exists. + fn have_extrinsic(&self, hash: &Block::Hash) -> Result { + Ok(self.extrinsic(hash)?.is_some()) + } } /// Provides access to the optional cache. diff --git a/primitives/database/src/lib.rs b/primitives/database/src/lib.rs index 94fe16ce01db5..7107ea25c02c0 100644 --- a/primitives/database/src/lib.rs +++ b/primitives/database/src/lib.rs @@ -115,6 +115,11 @@ pub trait Database: Send + Sync { /// `key` is not currently in the database. fn get(&self, col: ColumnId, key: &[u8]) -> Option>; + /// Check if the value exists in the database without retrieving it. + fn contains(&self, col: ColumnId, key: &[u8]) -> bool { + self.get(col, key).is_some() + } + /// Call `f` with the value previously stored against `key`. /// /// This may be faster than `get` since it doesn't allocate. From b5e51e4535dfe16d16de67cd15a9e106539f8e27 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 22 Jan 2021 23:50:17 +0300 Subject: [PATCH 2/9] Style --- client/network/src/bitswap.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs index 940e73c64860a..cad2fe4e6eb4b 100644 --- a/client/network/src/bitswap.rs +++ b/client/network/src/bitswap.rs @@ -18,8 +18,7 @@ //! //! Allows querying transactions by hash over standard bitswap protocol //! Only supports bitswap 1.2.0. -//! - +//! CID is expected to reference 256-bit Blake2b transaction hash. use std::collections::VecDeque; use std::io; @@ -158,7 +157,6 @@ impl Prefix { /// Convert the prefix to encoded bytes. pub fn to_bytes(&self) -> Vec { let mut res = Vec::with_capacity(4); - let mut buf = varint_encode::u64_buffer(); let version = varint_encode::u64(self.version.into(), &mut buf); res.extend_from_slice(version); @@ -171,7 +169,6 @@ impl Prefix { let mut buf = varint_encode::u64_buffer(); let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf); res.extend_from_slice(mh_len); - res } } From f2769d189d99155fc76ae956e94ef823d8a73135 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 22 Jan 2021 23:51:20 +0300 Subject: [PATCH 3/9] Indent --- client/network/src/bitswap.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs index cad2fe4e6eb4b..1fd615bfb9087 100644 --- a/client/network/src/bitswap.rs +++ b/client/network/src/bitswap.rs @@ -304,15 +304,15 @@ impl NetworkBehaviour for Bitswap { Self::OutEvent, >, > { - if let Some((peer_id, message)) = self.ready_blocks.pop_front() { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::Any, - event: message, - }) - } - Poll::Pending + if let Some((peer_id, message)) = self.ready_blocks.pop_front() { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::Any, + event: message, + }) } + Poll::Pending + } } /// Bitswap protocol error. From 705d669689ced4a2cb1c6ef944551a5d3c40029a Mon Sep 17 00:00:00 2001 From: arkpar Date: Sat, 23 Jan 2021 19:44:31 +0300 Subject: [PATCH 4/9] Log message --- client/db/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/db/src/utils.rs b/client/db/src/utils.rs index baea6aab69fa0..26530e1821662 100644 --- a/client/db/src/utils.rs +++ b/client/db/src/utils.rs @@ -401,7 +401,7 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< } { let hash = header.hash(); - debug!("DB Opened blockchain db, fetched {} = {:?} ({})", desc, hash, header.number()); + debug!(target: "db", "Opened blockchain db, fetched {} = {:?} ({})", desc, hash, header.number()); Ok((hash, *header.number())) } else { Ok((genesis_hash.clone(), Zero::zero())) From c114caff4e58365f16a48f7639cb1ce3d14c776e Mon Sep 17 00:00:00 2001 From: arkpar Date: Sat, 23 Jan 2021 19:55:03 +0300 Subject: [PATCH 5/9] CLI option --- client/cli/src/params/network_params.rs | 5 +++++ client/network/src/behaviour.rs | 8 ++++---- client/network/src/config.rs | 3 +++ client/network/src/service.rs | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 0b53616b9ed13..5502fa47c3c09 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -106,6 +106,10 @@ pub struct NetworkParams { /// security improvements. #[structopt(long)] pub kademlia_disjoint_query_paths: bool, + + /// Join the IPFS network and serve transactions over bitswap protocol. + #[structopt(long)] + pub ipfs_server: bool, } impl NetworkParams { @@ -176,6 +180,7 @@ impl NetworkParams { allow_non_globals_in_dht, kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths, yamux_window_size: None, + ipfs_server: self.ipfs_server, } } } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index b83adc8de3072..e9d5d03a10aa4 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -30,7 +30,7 @@ use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::identify::IdentifyInfo; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle}; use log::debug; use prost::Message; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; @@ -58,7 +58,7 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, - bitswap: Bitswap, + bitswap: Toggle>, /// Generic request-reponse protocols. request_responses: request_responses::RequestResponsesBehaviour, /// Light client request handling. @@ -180,7 +180,7 @@ impl Behaviour { disco_config: DiscoveryConfig, // Block request protocol config. block_request_protocol_config: request_responses::ProtocolConfig, - bitswap: Bitswap, + bitswap: Option>, // All remaining request protocol configs. mut request_response_protocols: Vec, ) -> Result { @@ -192,7 +192,7 @@ impl Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), - bitswap, + bitswap: bitswap.into(), request_responses: request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, light_client_handler, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index c0e2c66482b9a..40d913ce2a346 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -401,6 +401,8 @@ pub struct NetworkConfiguration { /// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the /// presence of potentially adversarial nodes. pub kademlia_disjoint_query_paths: bool, + /// Enable serving block data over IPFS bitswap. + pub ipfs_server: bool, /// Size of Yamux receive window of all substreams. `None` for the default (256kiB). /// Any value less than 256kiB is invalid. @@ -452,6 +454,7 @@ impl NetworkConfiguration { allow_non_globals_in_dht: false, kademlia_disjoint_query_paths: false, yamux_window_size: None, + ipfs_server: false, } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 00b6eb3886bc5..bcbfb20d7c7ce 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -330,7 +330,7 @@ impl NetworkWorker { }; let behaviour = { - let bitswap = Bitswap::new(client); + let bitswap = if params.network_config.ipfs_server { Some(Bitswap::new(client)) } else { None }; let result = Behaviour::new( protocol, params.role, From 7577f98b54c8b7d1a3eb49bf76a15f942ce803c2 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 27 Jan 2021 12:14:23 +0300 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: Pierre Krieger --- client/network/src/bitswap.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs index 1fd615bfb9087..3d003a8eee1ca 100644 --- a/client/network/src/bitswap.rs +++ b/client/network/src/bitswap.rs @@ -1,4 +1,4 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. +// Copyright 2021 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify @@ -24,7 +24,7 @@ use std::collections::VecDeque; use std::io; use std::sync::Arc; use std::task::{Context, Poll}; -use cid::{self, Version}; +use cid::Version; use codec::Encode; use core::pin::Pin; use futures::Future; @@ -212,7 +212,7 @@ impl NetworkBehaviour for Bitswap { HandlerEvent::ResponseSent => return, HandlerEvent::Request(msg) => msg, }; - log::trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); + trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer); if self.ready_blocks.len() > MAX_RESPONSE_QUEUE { debug!(target: LOG_TARGET, "Ignored request: queue is full"); return; From 96de0a39ac3a895cea94b38631dd98fb03f9a60c Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 27 Jan 2021 12:55:15 +0300 Subject: [PATCH 7/9] Style --- client/db/src/utils.rs | 8 +++++++- client/network/src/behaviour.rs | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/client/db/src/utils.rs b/client/db/src/utils.rs index 26530e1821662..cd9b2a6f56d41 100644 --- a/client/db/src/utils.rs +++ b/client/db/src/utils.rs @@ -401,7 +401,13 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< } { let hash = header.hash(); - debug!(target: "db", "Opened blockchain db, fetched {} = {:?} ({})", desc, hash, header.number()); + debug!( + target: "db", + "Opened blockchain db, fetched {} = {:?} ({})", + desc, + hash, + header.number() + ); Ok((hash, *header.number())) } else { Ok((genesis_hash.clone(), Zero::zero())) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 2422e4df7fe26..7bdab815eb941 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -30,7 +30,9 @@ use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::identify::IdentifyInfo; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle}; +use libp2p::swarm::{ + NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle +}; use log::debug; use prost::Message; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; From 5c5d5c1bf5665ed7f36a99803ee1efa0a23fd2f4 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 27 Jan 2021 22:32:54 +0300 Subject: [PATCH 8/9] Style --- client/network/src/bitswap.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs index 3d003a8eee1ca..69716ebeadcbd 100644 --- a/client/network/src/bitswap.rs +++ b/client/network/src/bitswap.rs @@ -50,7 +50,8 @@ use crate::schema::bitswap::{ const LOG_TARGET: &str = "bitswap"; // Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes -// https://github.com/ipfs/js-ipfs-bitswap/blob/d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 +// https://github.com/ipfs/js-ipfs-bitswap/blob/ +// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16 // We set it to the same value as max substrate protocol message const MAX_PACKET_SIZE: usize = 16 * 1024 * 1024; From 891c80654ab8141db0705672275be8b452ad33ed Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 2 Feb 2021 16:32:06 +0100 Subject: [PATCH 9/9] Minor fixes --- client/api/src/in_mem.rs | 2 +- client/network/src/behaviour.rs | 1 + client/network/src/bitswap.rs | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index b5a1fac83437f..c108acc7b43b9 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -391,7 +391,7 @@ impl blockchain::Backend for Blockchain { &self, _hash: &Block::Hash, ) -> sp_blockchain::Result::Extrinsic>> { - unimplemented!() + unimplemented!("Not supported by the in-mem backend.") } } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 3af769b12657d..ffa8bb3c417e1 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -61,6 +61,7 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + /// Bitswap server for blockchain data. bitswap: Toggle>, /// Generic request-reponse protocols. request_responses: request_responses::RequestResponsesBehaviour, diff --git a/client/network/src/bitswap.rs b/client/network/src/bitswap.rs index 69716ebeadcbd..7129f3dbe07b1 100644 --- a/client/network/src/bitswap.rs +++ b/client/network/src/bitswap.rs @@ -248,7 +248,8 @@ impl NetworkBehaviour for Bitswap { continue; } }; - if cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) + if cid.version() != cid::Version::V1 + || cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) || cid.hash().size() != 32 { debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);