diff --git a/Cargo.lock b/Cargo.lock index 1b8c0775fd..3179594a36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4532,6 +4532,7 @@ dependencies = [ "beserial", "futures-util", "gloo-timers", + "js-sys", "nimiq-blockchain-interface", "nimiq-blockchain-proxy", "nimiq-bls", diff --git a/blockchain/src/history/history_store.rs b/blockchain/src/history/history_store.rs index 6b258eb3dc..b5f13c2e4e 100644 --- a/blockchain/src/history/history_store.rs +++ b/blockchain/src/history/history_store.rs @@ -16,14 +16,13 @@ use nimiq_mmr::mmr::proof::RangeProof; use nimiq_mmr::mmr::MerkleMountainRange; use nimiq_mmr::store::memory::MemoryStore; use nimiq_primitives::policy::Policy; +use nimiq_transaction::history_proof::HistoryTreeProof; use nimiq_transaction::{ extended_transaction::{ExtTxData, ExtendedTransaction}, inherent::InherentType, }; -use crate::history::{ - mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk, HistoryTreeProof, -}; +use crate::history::{mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk}; /// A struct that contains databases to store history trees (which are Merkle Mountain Ranges /// constructed from the list of extended transactions in an epoch) and extended transactions (which diff --git a/blockchain/src/history/mod.rs b/blockchain/src/history/mod.rs index f26cf63559..2d81c5b109 100644 --- a/blockchain/src/history/mod.rs +++ b/blockchain/src/history/mod.rs @@ -1,9 +1,7 @@ pub use history_store::HistoryStore; pub use history_tree_chunk::{HistoryTreeChunk, CHUNK_SIZE}; -pub use history_tree_proof::HistoryTreeProof; mod history_store; mod history_tree_chunk; -mod history_tree_proof; mod mmr_store; mod ordered_hash; diff --git a/consensus/src/consensus/consensus_proxy.rs b/consensus/src/consensus/consensus_proxy.rs index 2c2fc907ec..254f389d60 100644 --- a/consensus/src/consensus/consensus_proxy.rs +++ b/consensus/src/consensus/consensus_proxy.rs @@ -1,14 +1,22 @@ +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use nimiq_keys::Address; +use nimiq_network_interface::peer_info::Services; +use nimiq_network_interface::request::OutboundRequestError; +use nimiq_network_interface::request::RequestError; +use nimiq_primitives::policy::Policy; +use nimiq_transaction::extended_transaction::ExtendedTransaction; use tokio::sync::broadcast::Sender as BroadcastSender; use tokio_stream::wrappers::BroadcastStream; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_network_interface::network::Network; +use nimiq_network_interface::network::{CloseReason, Network}; use nimiq_primitives::account::AccountType; use nimiq_transaction::{ControlTransactionTopic, Transaction, TransactionTopic}; +use crate::messages::{RequestTransactionsByAddress, RequestTransactionsProof}; use crate::ConsensusEvent; pub struct ConsensusProxy { @@ -44,4 +52,105 @@ impl ConsensusProxy { pub fn subscribe_events(&self) -> BroadcastStream { BroadcastStream::new(self.events.subscribe()) } + + pub async fn request_transactions_by_address( + &self, + address: Address, + max: Option, + ) -> Result, RequestError> { + // First we tell the network to provide us with a vector that contains all the connected peers that support such services + // Note: If the network could not connect to any peer that satisfies our requirement, then an error would be returned + let peers = self + .network + .get_peers_by_services(Services::TRANSACTION_INDEX) + .await + .map_err(|error| { + log::error!( + err = %error, + "The transactions by address request couldn't be fulfilled" + ); + + RequestError::OutboundRequest(OutboundRequestError::SendError) + })?; + + // At this point we obtained a list of connected peers that could satisfy our request, + // so we perform the request to each of those peers: + let mut verified_transactions = HashMap::new(); + + for peer_id in peers { + let response = self + .network + .request::( + RequestTransactionsByAddress { + address: address.clone(), + max, + }, + peer_id, + ) + .await; + + match response { + Ok(response) => { + //Now we request proofs for each transaction we requested. + for transaction in response.transactions { + // If the transaction was already verified, then no need to verify it again + if verified_transactions.contains_key(&transaction.tx_hash()) { + continue; + } + + let response = self + .network + .request::( + RequestTransactionsProof { + hashes: vec![transaction.tx_hash()], + epoch_number: Policy::epoch_at(transaction.block_number), + }, + peer_id, + ) + .await; + match response { + Ok(proof_response) => { + //We verify the transaction using the proof + if let Some(proof) = proof_response.proof { + if let Some(block) = proof_response.block { + let verification_result = proof + .verify(block.history_root().clone()) + .map_or(false, |result| result); + + if verification_result { + verified_transactions + .insert(transaction.tx_hash(), transaction); + } else { + //The proof didnt verify so we disconnect from this peer + self.network + .disconnect_peer(peer_id, CloseReason::Other) + .await; + break; + } + } else { + //If we receive a proof but we do not recieve a block we disconnect from the peer + self.network + .disconnect_peer(peer_id, CloseReason::Other) + .await; + break; + } + } + } + Err(error) => { + //If there was a request error with this peer we dont request anymore proofs from it + log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions proof from peer"); + break; + } + } + } + } + Err(error) => { + //If there was a request error with this peer we log an error + log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions from peer"); + } + } + } + + Ok(verified_transactions.into_values().collect()) + } } diff --git a/consensus/src/messages/mod.rs b/consensus/src/messages/mod.rs index 3686ea02fa..fee2d75e98 100644 --- a/consensus/src/messages/mod.rs +++ b/consensus/src/messages/mod.rs @@ -3,13 +3,12 @@ use std::fmt::{Debug, Formatter}; use beserial::{Deserialize, Serialize}; use nimiq_block::{Block, MacroBlock}; #[cfg(feature = "full")] -use nimiq_blockchain::{HistoryTreeChunk, HistoryTreeProof}; +use nimiq_blockchain::HistoryTreeChunk; use nimiq_hash::Blake2bHash; -#[cfg(feature = "full")] use nimiq_keys::Address; use nimiq_network_interface::request::{RequestCommon, RequestMarker}; -#[cfg(feature = "full")] use nimiq_transaction::extended_transaction::ExtendedTransaction; +use nimiq_transaction::history_proof::HistoryTreeProof; mod handlers; @@ -211,22 +210,19 @@ impl RequestCommon for RequestHead { const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD; } -#[cfg(feature = "full")] #[derive(Serialize, Deserialize)] pub struct ResponseTransactionsProof { - proof: Option, - block: Option, + pub proof: Option, + pub block: Option, } -#[cfg(feature = "full")] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RequestTransactionsProof { #[beserial(len_type(u16, limit = 128))] - hashes: Vec, - epoch_number: u32, + pub hashes: Vec, + pub epoch_number: u32, } -#[cfg(feature = "full")] impl RequestCommon for RequestTransactionsProof { type Kind = RequestMarker; const TYPE_ID: u16 = 213; @@ -234,14 +230,12 @@ impl RequestCommon for RequestTransactionsProof { const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_PROOF; } -#[cfg(feature = "full")] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RequestTransactionsByAddress { - address: Address, - max: Option, + pub address: Address, + pub max: Option, } -#[cfg(feature = "full")] impl RequestCommon for RequestTransactionsByAddress { type Kind = RequestMarker; const TYPE_ID: u16 = 214; @@ -249,9 +243,8 @@ impl RequestCommon for RequestTransactionsByAddress { const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_BY_ADDRESS; } -#[cfg(feature = "full")] #[derive(Serialize, Deserialize)] pub struct ResponseTransactionsByAddress { #[beserial(len_type(u16, limit = 128))] - transactions: Vec, + pub transactions: Vec, } diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 85b5970b8f..ee8e633b01 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -81,6 +81,12 @@ pub trait Network: Send + Sync + Unpin + 'static { /// If the peer isn't found, `None` is returned. fn get_peer_info(&self, peer_id: Self::PeerId) -> Option; + /// Gets the set of connected peers that provide the supplied services + async fn get_peers_by_services( + &self, + services: Services, + ) -> Result, Self::Error>; + /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index d6ff4c2f9f..e98c78025f 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -288,6 +288,31 @@ impl ConnectionPoolBehaviour { .choose_multiple(&mut thread_rng(), num_peers) } + pub fn choose_peers_to_dial_by_services(&self, services: Services) -> Vec { + let num_peers = usize::min( + self.config.peer_count_desired - self.peer_ids.num_connected(), + self.config.dialing_count_max - self.peer_ids.num_dialing(), + ); + let contacts = self.contacts.read(); + let own_contact = contacts.get_own_contact(); + let own_peer_id = own_contact.peer_id(); + + contacts + .query(services) + .filter_map(|contact| { + let peer_id = contact.peer_id(); + if peer_id != own_peer_id + && self.peer_ids.can_dial(peer_id) + && contact.addresses().count() > 0 + { + Some(*peer_id) + } else { + None + } + }) + .choose_multiple(&mut thread_rng(), num_peers) + } + fn choose_seeds_to_dial(&self) -> Vec { // We prefer to connect to non-seed peers. Thus, we only choose any seeds here if we're // not already dialing any peers and at most one seed at a time. diff --git a/network-libp2p/src/error.rs b/network-libp2p/src/error.rs index fa125912db..2774c44f2d 100644 --- a/network-libp2p/src/error.rs +++ b/network-libp2p/src/error.rs @@ -14,6 +14,9 @@ pub enum NetworkError { #[error("Network action was cancelled")] Cancelled, + #[error("We coudnt find any peer that satisfies the desired services")] + PeersNotFound, + #[error("Serialization error: {0}")] Serialization(#[from] beserial::SerializingError), diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index b3c415dc81..c48c27b10f 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -136,6 +136,10 @@ pub(crate) enum NetworkAction { ListenOn { listen_addresses: Vec, }, + ConnectPeersByServices { + services: Services, + output: oneshot::Sender>, + }, StartConnecting, DisconnectPeer { peer_id: PeerId, @@ -1270,6 +1274,23 @@ impl Network { NetworkAction::StartConnecting => { swarm.behaviour_mut().pool.start_connecting(); } + NetworkAction::ConnectPeersByServices { services, output } => { + let peers_candidates = swarm + .behaviour_mut() + .pool + .choose_peers_to_dial_by_services(services); + let mut successful_peers = vec![]; + + for peer_id in peers_candidates { + if Swarm::dial(swarm, DialOpts::peer_id(peer_id).build()).is_ok() { + successful_peers.push(peer_id); + } + } + + if output.send(successful_peers).is_err() { + error!("Could not send sucessful peers vector"); + } + } NetworkAction::DisconnectPeer { peer_id } => { if swarm.disconnect_peer_id(peer_id).is_err() { warn!(%peer_id, "Peer already closed"); @@ -1678,6 +1699,46 @@ impl NetworkInterface for Network { } } + async fn get_peers_by_services( + &self, + services: Services, + ) -> Result, NetworkError> { + let (output_tx, output_rx) = oneshot::channel(); + let connected_peers = self.get_peers(); + let mut filtered_peers = vec![]; + + // First we try to get the connected peers that support the desired services + for peer_id in connected_peers.iter() { + if let Some(peer_info) = self.get_peer_info(*peer_id) { + if peer_info.get_services().contains(services) { + filtered_peers.push(*peer_id); + } + } + } + + // If we dont have any connected peer that support the desired services, + // we tell the network to connect to new peers that support such services. + if connected_peers.is_empty() { + self.action_tx + .send(NetworkAction::ConnectPeersByServices { + services, + output: output_tx, + }) + .await?; + + filtered_peers.extend(output_rx.await?.iter()); + } + + // If filtered_peers is still empty at this point, + // it means that currently there is no peer that support the services in the network, + // so we return an error. + if filtered_peers.is_empty() { + return Err(NetworkError::PeersNotFound); + } + + Ok(filtered_peers) + } + async fn disconnect_peer(&self, peer_id: PeerId, _close_reason: CloseReason) { if let Err(error) = self .action_tx diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index cc590ff5f3..98f03e09ba 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -581,4 +581,11 @@ impl Network for MockNetwork { fn get_peer_info(&self, peer_id: Self::PeerId) -> Option { self.peers.read().get(&peer_id).cloned() } + + async fn get_peers_by_services( + &self, + _services: Services, + ) -> Result, MockNetworkError> { + Ok(self.get_peers()) + } } diff --git a/blockchain/src/history/history_tree_proof.rs b/primitives/transaction/src/history_proof.rs similarity index 94% rename from blockchain/src/history/history_tree_proof.rs rename to primitives/transaction/src/history_proof.rs index a8b217fc64..82c27b98a3 100644 --- a/blockchain/src/history/history_tree_proof.rs +++ b/primitives/transaction/src/history_proof.rs @@ -4,13 +4,14 @@ use beserial::{ }; use nimiq_hash::Blake2bHash; use nimiq_mmr::mmr::proof::Proof; -use nimiq_transaction::extended_transaction::ExtendedTransaction; + +use crate::extended_transaction::ExtendedTransaction; /// Struct containing a vector of extended transactions together with a Merkle proof for them. It /// allows one to prove/verify that specific transactions are part of the History Tree. pub struct HistoryTreeProof { - pub(crate) proof: Proof, - pub(crate) positions: Vec, + pub proof: Proof, + pub positions: Vec, pub history: Vec, } diff --git a/primitives/transaction/src/lib.rs b/primitives/transaction/src/lib.rs index 5a9675071b..1ed110d099 100644 --- a/primitives/transaction/src/lib.rs +++ b/primitives/transaction/src/lib.rs @@ -28,6 +28,7 @@ use crate::account::AccountTransactionVerification; pub mod account; pub mod extended_transaction; +pub mod history_proof; pub mod inherent; pub mod reward; diff --git a/web-client/Cargo.toml b/web-client/Cargo.toml index 7256920720..db544cb525 100644 --- a/web-client/Cargo.toml +++ b/web-client/Cargo.toml @@ -27,6 +27,7 @@ log = { package = "tracing", version = "0.1", features = ["log"] } serde-wasm-bindgen = "0.4" wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" +js-sys = "0.3.61" beserial = { path = "../beserial", features = ["derive"] } nimiq-blockchain-interface = { path = "../blockchain-interface" } diff --git a/web-client/src/address.rs b/web-client/src/address.rs index e1ea081155..01d4ba9e4e 100644 --- a/web-client/src/address.rs +++ b/web-client/src/address.rs @@ -41,4 +41,8 @@ impl Address { pub fn native_ref(&self) -> &nimiq_keys::Address { &self.inner } + + pub fn get_native(&self) -> nimiq_keys::Address { + self.inner.clone() + } } diff --git a/web-client/src/lib.rs b/web-client/src/lib.rs index 6e90517d3f..b4ac8d3b4e 100644 --- a/web-client/src/lib.rs +++ b/web-client/src/lib.rs @@ -1,7 +1,10 @@ use std::str::FromStr; +use address::Address; use futures::StreamExt; use gloo_timers::future::IntervalStream; +use js_sys::Uint8Array; +use nimiq_transaction::ExecutedTransaction; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::spawn_local; @@ -343,6 +346,43 @@ impl WebClient { Ok(()) } + + /// This function is used to query the network for transactions from some specific adress, that + /// have been included in the chain. + /// The obtained transactions correspond to extended transactions. + /// They are verified before being returned. + #[wasm_bindgen(js_name = requestTransactionsFromAddress)] + pub async fn request_transations_from_address( + &self, + address: Address, + max: u16, + ) -> Result { + let transactions = self + .inner + .consensus_proxy() + .request_transactions_by_address(address.get_native(), Some(max)) + .await?; + + let executed_txs: Vec = transactions + .into_iter() + .map(|ext_transaction| ext_transaction.into_transaction().unwrap()) + .collect(); + + // TODO: We are converting executed transactions into a regular transaction, which loses the execution result of the txn + let js_transactions: Vec = executed_txs + .into_iter() + .map(|executed_tx| Transaction::from_native(executed_tx.get_raw_transaction().clone())) + .collect(); + + let mut js_values = vec![]; + + for js_transaction in js_transactions { + //We need to serialize the exported transaction + js_values.extend(js_transaction.serialize()); + } + + Ok(Uint8Array::from(js_values.as_slice())) + } } #[wasm_bindgen]