Skip to content

Commit

Permalink
Expose RequestTransactionsByAddress to JS
Browse files Browse the repository at this point in the history
  • Loading branch information
viquezclaudio committed Feb 23, 2023
1 parent 6098449 commit a9a9ee9
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions blockchain/src/history/history_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use nimiq_mmr::mmr::proof::RangeProof;
use nimiq_mmr::mmr::MerkleMountainRange;
use nimiq_mmr::store::memory::MemoryStore;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::history_proof::HistoryTreeProof;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
inherent::InherentType,
};

use crate::history::{
mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk, HistoryTreeProof,
};
use crate::history::{mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk};

/// A struct that contains databases to store history trees (which are Merkle Mountain Ranges
/// constructed from the list of extended transactions in an epoch) and extended transactions (which
Expand Down
2 changes: 0 additions & 2 deletions blockchain/src/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
pub use history_store::HistoryStore;
pub use history_tree_chunk::{HistoryTreeChunk, CHUNK_SIZE};
pub use history_tree_proof::HistoryTreeProof;

mod history_store;
mod history_tree_chunk;
mod history_tree_proof;
mod mmr_store;
mod ordered_hash;
111 changes: 110 additions & 1 deletion consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use nimiq_keys::Address;
use nimiq_network_interface::peer_info::Services;
use nimiq_network_interface::request::OutboundRequestError;
use nimiq_network_interface::request::RequestError;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_network_interface::network::Network;
use nimiq_network_interface::network::{CloseReason, Network};
use nimiq_primitives::account::AccountType;
use nimiq_transaction::{ControlTransactionTopic, Transaction, TransactionTopic};

use crate::messages::{RequestTransactionsByAddress, RequestTransactionsProof};
use crate::ConsensusEvent;

pub struct ConsensusProxy<N: Network> {
Expand Down Expand Up @@ -44,4 +52,105 @@ impl<N: Network> ConsensusProxy<N> {
pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
}

pub async fn request_transactions_by_address(
&self,
address: Address,
max: Option<u16>,
) -> Result<Vec<ExtendedTransaction>, RequestError> {
// First we tell the network to provide us with a vector that contains all the connected peers that support such services
// Note: If the network could not connect to any peer that satisfies our requirement, then an error would be returned
let peers = self
.network
.get_peers_by_services(Services::TRANSACTION_INDEX)
.await
.map_err(|error| {
log::error!(
err = %error,
"The transactions by address request couldn't be fulfilled"
);

RequestError::OutboundRequest(OutboundRequestError::SendError)
})?;

// At this point we obtained a list of connected peers that could satisfy our request,
// so we perform the request to each of those peers:
let mut verified_transactions = HashMap::new();

for peer_id in peers {
let response = self
.network
.request::<RequestTransactionsByAddress>(
RequestTransactionsByAddress {
address: address.clone(),
max,
},
peer_id,
)
.await;

match response {
Ok(response) => {
//Now we request proofs for each transaction we requested.
for transaction in response.transactions {
// If the transaction was already verified, then no need to verify it again
if verified_transactions.contains_key(&transaction.tx_hash()) {
continue;
}

let response = self
.network
.request::<RequestTransactionsProof>(
RequestTransactionsProof {
hashes: vec![transaction.tx_hash()],
epoch_number: Policy::epoch_at(transaction.block_number),
},
peer_id,
)
.await;
match response {
Ok(proof_response) => {
//We verify the transaction using the proof
if let Some(proof) = proof_response.proof {
if let Some(block) = proof_response.block {
let verification_result = proof
.verify(block.history_root().clone())
.map_or(false, |result| result);

if verification_result {
verified_transactions
.insert(transaction.tx_hash(), transaction);
} else {
//The proof didnt verify so we disconnect from this peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
} else {
//If we receive a proof but we do not recieve a block we disconnect from the peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
}
}
Err(error) => {
//If there was a request error with this peer we dont request anymore proofs from it
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions proof from peer");
break;
}
}
}
}
Err(error) => {
//If there was a request error with this peer we log an error
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions from peer");
}
}
}

Ok(verified_transactions.into_values().collect())
}
}
25 changes: 9 additions & 16 deletions consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::fmt::{Debug, Formatter};
use beserial::{Deserialize, Serialize};
use nimiq_block::{Block, MacroBlock};
#[cfg(feature = "full")]
use nimiq_blockchain::{HistoryTreeChunk, HistoryTreeProof};
use nimiq_blockchain::HistoryTreeChunk;
use nimiq_hash::Blake2bHash;
#[cfg(feature = "full")]
use nimiq_keys::Address;
use nimiq_network_interface::request::{RequestCommon, RequestMarker};
#[cfg(feature = "full")]
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use nimiq_transaction::history_proof::HistoryTreeProof;

mod handlers;

Expand Down Expand Up @@ -211,47 +210,41 @@ impl RequestCommon for RequestHead {
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsProof {
proof: Option<HistoryTreeProof>,
block: Option<Block>,
pub proof: Option<HistoryTreeProof>,
pub block: Option<Block>,
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsProof {
#[beserial(len_type(u16, limit = 128))]
hashes: Vec<Blake2bHash>,
epoch_number: u32,
pub hashes: Vec<Blake2bHash>,
pub epoch_number: u32,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsProof {
type Kind = RequestMarker;
const TYPE_ID: u16 = 213;
type Response = ResponseTransactionsProof;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_PROOF;
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsByAddress {
address: Address,
max: Option<u16>,
pub address: Address,
pub max: Option<u16>,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsByAddress {
type Kind = RequestMarker;
const TYPE_ID: u16 = 214;
type Response = ResponseTransactionsByAddress;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_BY_ADDRESS;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsByAddress {
#[beserial(len_type(u16, limit = 128))]
transactions: Vec<ExtendedTransaction>,
pub transactions: Vec<ExtendedTransaction>,
}
6 changes: 6 additions & 0 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ pub trait Network: Send + Sync + Unpin + 'static {
/// If the peer isn't found, `None` is returned.
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;

/// Gets the set of connected peers that provide the supplied services
async fn get_peers_by_services(
&self,
services: Services,
) -> Result<Vec<Self::PeerId>, Self::Error>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;

Expand Down
25 changes: 25 additions & 0 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,31 @@ impl ConnectionPoolBehaviour {
.choose_multiple(&mut thread_rng(), num_peers)
}

pub fn choose_peers_to_dial_by_services(&self, services: Services) -> Vec<PeerId> {
let num_peers = usize::min(
self.config.peer_count_desired - self.peer_ids.num_connected(),
self.config.dialing_count_max - self.peer_ids.num_dialing(),
);
let contacts = self.contacts.read();
let own_contact = contacts.get_own_contact();
let own_peer_id = own_contact.peer_id();

contacts
.query(services)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
&& self.peer_ids.can_dial(peer_id)
&& contact.addresses().count() > 0
{
Some(*peer_id)
} else {
None
}
})
.choose_multiple(&mut thread_rng(), num_peers)
}

fn choose_seeds_to_dial(&self) -> Vec<Multiaddr> {
// We prefer to connect to non-seed peers. Thus, we only choose any seeds here if we're
// not already dialing any peers and at most one seed at a time.
Expand Down
3 changes: 3 additions & 0 deletions network-libp2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum NetworkError {
#[error("Network action was cancelled")]
Cancelled,

#[error("We coudnt find any peer that satisfies the desired services")]
PeersNotFound,

#[error("Serialization error: {0}")]
Serialization(#[from] beserial::SerializingError),

Expand Down
61 changes: 61 additions & 0 deletions network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ pub(crate) enum NetworkAction {
ListenOn {
listen_addresses: Vec<Multiaddr>,
},
ConnectPeersByServices {
services: Services,
output: oneshot::Sender<Vec<PeerId>>,
},
StartConnecting,
DisconnectPeer {
peer_id: PeerId,
Expand Down Expand Up @@ -1270,6 +1274,23 @@ impl Network {
NetworkAction::StartConnecting => {
swarm.behaviour_mut().pool.start_connecting();
}
NetworkAction::ConnectPeersByServices { services, output } => {
let peers_candidates = swarm
.behaviour_mut()
.pool
.choose_peers_to_dial_by_services(services);
let mut successful_peers = vec![];

for peer_id in peers_candidates {
if Swarm::dial(swarm, DialOpts::peer_id(peer_id).build()).is_ok() {
successful_peers.push(peer_id);
}
}

if output.send(successful_peers).is_err() {
error!("Could not send sucessful peers vector");
}
}
NetworkAction::DisconnectPeer { peer_id } => {
if swarm.disconnect_peer_id(peer_id).is_err() {
warn!(%peer_id, "Peer already closed");
Expand Down Expand Up @@ -1678,6 +1699,46 @@ impl NetworkInterface for Network {
}
}

async fn get_peers_by_services(
&self,
services: Services,
) -> Result<Vec<Self::PeerId>, NetworkError> {
let (output_tx, output_rx) = oneshot::channel();
let connected_peers = self.get_peers();
let mut filtered_peers = vec![];

// First we try to get the connected peers that support the desired services
for peer_id in connected_peers.iter() {
if let Some(peer_info) = self.get_peer_info(*peer_id) {
if peer_info.get_services().contains(services) {
filtered_peers.push(*peer_id);
}
}
}

// If we dont have any connected peer that support the desired services,
// we tell the network to connect to new peers that support such services.
if connected_peers.is_empty() {
self.action_tx
.send(NetworkAction::ConnectPeersByServices {
services,
output: output_tx,
})
.await?;

filtered_peers.extend(output_rx.await?.iter());
}

// If filtered_peers is still empty at this point,
// it means that currently there is no peer that support the services in the network,
// so we return an error.
if filtered_peers.is_empty() {
return Err(NetworkError::PeersNotFound);
}

Ok(filtered_peers)
}

async fn disconnect_peer(&self, peer_id: PeerId, _close_reason: CloseReason) {
if let Err(error) = self
.action_tx
Expand Down
7 changes: 7 additions & 0 deletions network-mock/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,11 @@ impl Network for MockNetwork {
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo> {
self.peers.read().get(&peer_id).cloned()
}

async fn get_peers_by_services(
&self,
_services: Services,
) -> Result<Vec<Self::PeerId>, MockNetworkError> {
Ok(self.get_peers())
}
}
Loading

0 comments on commit a9a9ee9

Please sign in to comment.