Skip to content

Commit

Permalink
Merge pull request #3178 from AleoHQ/unique-transmissions
Browse files Browse the repository at this point in the history
Skip proposing duplicated transmissions
  • Loading branch information
howardwu authored Mar 22, 2024
2 parents 689ce48 + 915ae93 commit 03e7928
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 33 deletions.
146 changes: 119 additions & 27 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub struct Primary<N: Network> {
}

impl<N: Network> Primary<N> {
/// The maximum number of unconfirmed transmissions to send to the primary.
pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;

/// Initializes a new primary instance.
pub fn new(
account: Account<N>,
Expand Down Expand Up @@ -394,38 +397,61 @@ impl<N: Network> Primary<N> {
let mut num_transactions = 0;
// Take the transmissions from the workers.
for worker in self.workers.iter() {
for (id, transmission) in worker.drain(num_transmissions_per_worker) {
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue;
// Initialize a tracker for included transmissions for the current worker.
let mut num_transmissions_included_for_worker = 0;
// Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
// Determine the number of remaining transmissions for the worker.
let num_remaining_transmissions =
num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
// Drain the worker.
let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
// If the worker is empty, break early.
if worker_transmissions.peek().is_none() {
break 'outer;
}
// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => {
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue;
}
// Iterate through the worker transmissions.
'inner: for (id, transmission) in worker_transmissions {
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue 'inner;
}
(TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => {
// Check if the transaction is still valid.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue;
// Check if the storage already contain the transmission.
// Note: We do not skip if this is the first transmission in the proposal, to ensure that
// the primary does not propose a batch with no transmissions.
if !transmissions.is_empty() && self.storage.contains_transmission(id) {
trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
continue 'inner;
}
// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => {
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue 'inner;
}
}
(TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => {
// Check if the transaction is still valid.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue 'inner;
}
// Increment the number of transactions.
num_transactions += 1;
}
// Increment the number of transactions.
num_transactions += 1;
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue 'inner,
}
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue,
// Insert the transmission into the map.
transmissions.insert(id, transmission);
num_transmissions_included_for_worker += 1;
}
// Insert the transmission into the map.
transmissions.insert(id, transmission);
}
}
// If there are no unconfirmed transmissions to propose, return early.
Expand Down Expand Up @@ -1761,6 +1787,72 @@ mod tests {
assert!(primary.proposed_batch.read().is_some());
}

#[tokio::test]
async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
let round = 3;
let prev_round = round - 1;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;

// Fill primary storage.
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Get transmissions from previous certificates.
let previous_certificate_ids: IndexSet<_> =
primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect();

// Track the number of transmissions in the previous round.
let mut num_transmissions_in_previous_round = 0;

// Generate a solution and a transaction.
let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);

// Store it on one of the workers.
primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();

// Check that the worker has 2 transmissions.
assert_eq!(primary.workers[0].num_transmissions(), 2);

// Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
for (_, account) in accounts.iter() {
let (certificate, transmissions) = create_batch_certificate(
account.address(),
&accounts,
round,
previous_certificate_ids.clone(),
&mut rng,
);

// Add the transmissions to the worker.
for (transmission_id, transmission) in transmissions.iter() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
}

// Insert the certificate to storage.
num_transmissions_in_previous_round += transmissions.len();
primary.storage.insert_certificate(certificate, transmissions).unwrap();
}

// Advance to the next round.
assert!(primary.storage.increment_to_next_round(round).is_ok());

// Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);

// Propose the batch.
assert!(primary.propose_batch().await.is_ok());

// Check that the proposal only contains the new transmissions that were not in previous certificates.
let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
assert_eq!(proposed_transmissions.len(), 2);
assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment)));
assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id)));
}

#[tokio::test]
async fn test_batch_propose_from_peer() {
let mut rng = TestRng::default();
Expand Down
16 changes: 10 additions & 6 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use snarkos_node_bft::{
Storage as NarwhalStorage,
},
spawn_blocking,
Primary,
BFT,
};
use snarkos_node_bft_ledger_service::LedgerService;
Expand Down Expand Up @@ -238,14 +239,17 @@ impl<N: Network> Consensus<N> {
}

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
|| num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
{
return Ok(());
}
// Retrieve the solutions.
let solutions = {
// Determine the available capacity.
let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed);
let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
// Acquire the lock on the queue.
let mut queue = self.solutions_queue.lock();
// Determine the number of solutions to send.
Expand Down Expand Up @@ -304,14 +308,14 @@ impl<N: Network> Consensus<N> {
}

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
return Ok(());
}
// Retrieve the transactions.
let transactions = {
// Determine the available capacity.
let capacity = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
// Acquire the lock on the transactions queue.
let mut tx_queue = self.transactions_queue.lock();
// Determine the number of deployments to send.
Expand Down

0 comments on commit 03e7928

Please sign in to comment.