Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose getTransactionsByAddress to JS #1356

Merged
merged 3 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions blockchain/src/blockchain/history_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use nimiq_blockchain_interface::{
use nimiq_database::WriteTransaction;
use nimiq_primitives::coin::Coin;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::extended_transaction::{ExtTxData, ExtendedTransaction};
use nimiq_transaction::inherent::{Inherent, InherentType};
use nimiq_transaction::Transaction;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
inherent::{Inherent, InherentType},
Transaction,
};

use crate::Blockchain;

Expand Down
5 changes: 2 additions & 3 deletions blockchain/src/history/history_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ use nimiq_mmr::store::memory::MemoryStore;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
history_proof::HistoryTreeProof,
inherent::InherentType,
};

use crate::history::{
mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk, HistoryTreeProof,
};
use crate::history::{mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk};

/// A struct that contains databases to store history trees (which are Merkle Mountain Ranges
/// constructed from the list of extended transactions in an epoch) and extended transactions (which
Expand Down
2 changes: 0 additions & 2 deletions blockchain/src/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
pub use history_store::HistoryStore;
pub use history_tree_chunk::{HistoryTreeChunk, CHUNK_SIZE};
pub use history_tree_proof::HistoryTreeProof;

mod history_store;
mod history_tree_chunk;
mod history_tree_proof;
mod mmr_store;
mod ordered_hash;
123 changes: 121 additions & 2 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_network_interface::network::Network;
use nimiq_keys::Address;
use nimiq_network_interface::{
network::{CloseReason, Network},
peer_info::Services,
request::{OutboundRequestError, RequestError},
};
use nimiq_primitives::account::AccountType;
use nimiq_transaction::{ControlTransactionTopic, Transaction, TransactionTopic};
use nimiq_transaction::{
extended_transaction::ExtendedTransaction, ControlTransactionTopic, Transaction,
TransactionTopic,
};

use crate::messages::{RequestTransactionsByAddress, RequestTransactionsProof};
use crate::ConsensusEvent;

pub struct ConsensusProxy<N: Network> {
Expand Down Expand Up @@ -44,4 +54,113 @@ impl<N: Network> ConsensusProxy<N> {
pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
}

pub async fn request_transactions_by_address(
&self,
address: Address,
min_peers: usize,
max: Option<u16>,
) -> Result<Vec<ExtendedTransaction>, RequestError> {
// First we tell the network to provide us with a vector that contains all the connected peers that support such services
// Note: If the network could not provide enough peers that satisfies our requirement, then an error would be returned
let peers = self
.network
.get_peers_by_services(Services::TRANSACTION_INDEX, min_peers)
.await
.map_err(|error| {
log::error!(
err = %error,
"The transactions by address request couldn't be fulfilled"
);

RequestError::OutboundRequest(OutboundRequestError::SendError)
})?;

let mut verified_transactions = HashMap::new();

// At this point we obtained a list of connected peers that could satisfy our request,
// so we perform the request to each of those peers:
for peer_id in peers {
let response = self
.network
.request::<RequestTransactionsByAddress>(
RequestTransactionsByAddress {
address: address.clone(),
max,
},
peer_id,
)
.await;

match response {
Ok(response) => {
// Now we request proofs for each transaction we requested.
for transaction in response.transactions {
// If the transaction was already verified, then we don't need to verify it again
if verified_transactions.contains_key(&transaction.tx_hash()) {
continue;
}

let response = self
.network
.request::<RequestTransactionsProof>(
RequestTransactionsProof {
hashes: vec![transaction.tx_hash()],
block_number: transaction.block_number,
},
peer_id,
)
.await;
match response {
Ok(proof_response) => {
// We verify the transaction using the proof
if let Some(proof) = proof_response.proof {
if let Some(block) = proof_response.block {
// TODO: We are currently assuming that the provided block was included in the chain
// but we also need some additional information to prove the block is part of the chain.
let verification_result = proof
.verify(block.history_root().clone())
.map_or(false, |result| result);

if verification_result {
for tx in proof.history {
verified_transactions.insert(tx.tx_hash(), tx);
}
} else {
// The proof didn't verify so we disconnect from this peer
log::debug!(peer=%peer_id,"Disconnecting from peer because the transaction proof didn't verify");
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
} else {
// If we receive a proof but we do not recieve a block, we disconnect from the peer
log::debug!(peer=%peer_id,"Disconnecting from peer due to an inconsistency in the transaction proof response");
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
} else {
log::debug!(peer=%peer_id, "We requested a transaction proof but the peer didn't provide any");
}
}
Err(error) => {
// If there was a request error with this peer we don't request anymore proofs from it
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions proof from peer");
break;
}
}
}
}
Err(error) => {
// If there was a request error with this peer we log an error
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions from peer");
}
}
}

Ok(verified_transactions.into_values().collect())
}
}
7 changes: 6 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use nimiq_zkp_component::zkp_component::ZKPComponentProxy;

use crate::consensus::head_requests::{HeadRequests, HeadRequestsResult};
#[cfg(feature = "full")]
use crate::messages::{RequestBatchSet, RequestHistoryChunk, RequestTransactionsProof};
use crate::messages::{
RequestBatchSet, RequestHistoryChunk, RequestTransactionsByAddress, RequestTransactionsProof,
};
use crate::messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks};
#[cfg(feature = "full")]
use crate::sync::live::state_queue::RequestChunk;
Expand Down Expand Up @@ -155,6 +157,9 @@ impl<N: Network> Consensus<N> {

let stream = network.receive_requests::<RequestTransactionsProof>();
executor.exec(Box::pin(request_handler(network, stream, blockchain)));

let stream = network.receive_requests::<RequestTransactionsByAddress>();
executor.exec(Box::pin(request_handler(network, stream, blockchain)));
}
BlockchainProxy::Light(_) => {}
}
Expand Down
55 changes: 38 additions & 17 deletions consensus/src/messages/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,24 +307,45 @@ impl Handle<ResponseTransactionsProof, Arc<RwLock<Blockchain>>> for RequestTrans
let blockchain = blockchain.read();
let hashes = self.hashes.iter().collect();

let proof = blockchain
.history_store
.prove(self.epoch_number, hashes, None);

let block_height = self.epoch_number * Policy::blocks_per_epoch();

let block = blockchain
.chain_store
.get_block_at(block_height, false, None)
.ok()
.and_then(|block| {
if block.is_macro() {
// We expect a macro block
Some(block)
} else {
None
let mut block = None;

let proof =
blockchain
.history_store
.prove(Policy::epoch_at(self.block_number), hashes, None);

// If we obtained a proof, we need to supply the corresponding block
if proof.is_some() {
// If the block_number to proof is in a finalized epoch, use the epoch's finalization block
let election_block_number = if Policy::is_election_block_at(self.block_number) {
self.block_number
} else {
Policy::election_block_after(self.block_number)
};

if election_block_number <= blockchain.block_number() {
block = blockchain
.chain_store
.get_block_at(election_block_number, false, None)
.ok();
} else {
// Otherwise, the transaction proof was made at the current history store state, so return the current
// head block to prove it.
let mut head = blockchain.head();

// Convert to light block by removing the block body
match head {
Block::Macro(ref mut block) => block.body = None,
Block::Micro(ref mut block) => block.body = None,
}
});

block = Some(head);
}

if block.is_none() {
log::error!("We are supplying a transaction proof but not a block, which can be interpreted as malicious behaviour");
jsdanielh marked this conversation as resolved.
Show resolved Hide resolved
}
}

ResponseTransactionsProof { proof, block }
}
Expand Down
28 changes: 11 additions & 17 deletions consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::fmt::{Debug, Formatter};
use beserial::{Deserialize, Serialize};
use nimiq_block::{Block, MacroBlock};
#[cfg(feature = "full")]
use nimiq_blockchain::{HistoryTreeChunk, HistoryTreeProof};
use nimiq_blockchain::HistoryTreeChunk;
use nimiq_hash::Blake2bHash;
#[cfg(feature = "full")]
use nimiq_keys::Address;
use nimiq_network_interface::request::{RequestCommon, RequestMarker};
#[cfg(feature = "full")]
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use nimiq_transaction::{
extended_transaction::ExtendedTransaction, history_proof::HistoryTreeProof,
};

mod handlers;

Expand Down Expand Up @@ -211,47 +211,41 @@ impl RequestCommon for RequestHead {
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsProof {
proof: Option<HistoryTreeProof>,
block: Option<Block>,
pub proof: Option<HistoryTreeProof>,
pub block: Option<Block>,
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsProof {
#[beserial(len_type(u16, limit = 128))]
hashes: Vec<Blake2bHash>,
epoch_number: u32,
pub hashes: Vec<Blake2bHash>,
pub block_number: u32,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsProof {
type Kind = RequestMarker;
const TYPE_ID: u16 = 213;
type Response = ResponseTransactionsProof;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_PROOF;
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsByAddress {
address: Address,
max: Option<u16>,
pub address: Address,
pub max: Option<u16>,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsByAddress {
type Kind = RequestMarker;
const TYPE_ID: u16 = 214;
type Response = ResponseTransactionsByAddress;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_BY_ADDRESS;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsByAddress {
#[beserial(len_type(u16, limit = 128))]
transactions: Vec<ExtendedTransaction>,
pub transactions: Vec<ExtendedTransaction>,
}
10 changes: 10 additions & 0 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ pub trait Network: Send + Sync + Unpin + 'static {
/// If the peer isn't found, `None` is returned.
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;

/// Gets the set of connected peers that provide the supplied services.
/// If we currently don't have min number of connected peer that provides those services,
/// we dial peers.
/// If there aren't enough peers in the network that provides the required services, we return an error
async fn get_peers_by_services(
&self,
services: Services,
jsdanielh marked this conversation as resolved.
Show resolved Hide resolved
min_peers: usize,
) -> Result<Vec<Self::PeerId>, Self::Error>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;

Expand Down
28 changes: 28 additions & 0 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,34 @@ impl ConnectionPoolBehaviour {
.choose_multiple(&mut thread_rng(), num_peers)
}

/// This function is used to select a list of peers, based on services flag, in order to dial them.
/// `num_peers` is used to specify how many peers are selected
/// The number of peers returned equals num_peers unless there are less available peers
pub fn choose_peers_to_dial_by_services(
jsdanielh marked this conversation as resolved.
Show resolved Hide resolved
&self,
services: Services,
num_peers: usize,
) -> Vec<PeerId> {
let contacts = self.contacts.read();
let own_contact = contacts.get_own_contact();
let own_peer_id = own_contact.peer_id();

contacts
.query(services)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
&& self.peer_ids.can_dial(peer_id)
&& contact.addresses().count() > 0
{
Some(*peer_id)
} else {
None
}
})
.choose_multiple(&mut thread_rng(), num_peers)
}

fn choose_seeds_to_dial(&self) -> Vec<Multiaddr> {
// We prefer to connect to non-seed peers. Thus, we only choose any seeds here if we're
// not already dialing any peers and at most one seed at a time.
Expand Down
Loading