From 51aa1f3f571c366157f6b7272e446e42d7969b5f Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:06:05 -0700 Subject: [PATCH 1/8] Skip proposing transmissions that are in previous certificates --- node/bft/src/primary.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index f91cd52bf4..2ef40b2080 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -360,6 +360,10 @@ impl Primary { let previous_round = round.saturating_sub(1); // Retrieve the previous certificates. let previous_certificates = self.storage.get_certificates_for_round(previous_round); + // Retrieve the transmissions included in the previous certificates. + let previous_certificate_transmissions = cfg_iter!(previous_certificates) + .flat_map(|certificate| certificate.transmission_ids()) + .collect::>(); // Check if the batch is ready to be proposed. // Note: The primary starts at round 1, and round 0 contains no certificates, by definition. @@ -400,6 +404,11 @@ impl Primary { trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); continue; } + // Check if the previous certificates already contain the transmission. + if previous_certificate_transmissions.contains(&id) { + trace!("Proposing - Skipping transmission '{}' - Already in previous certificates", fmt_id(id)); + continue; + } // Check the transmission is still valid. match (id, transmission.clone()) { (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { From 639a556e4b2a31eea8c5d8e6e50f27de51aa2957 Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:09:34 -0700 Subject: [PATCH 2/8] Drain more transmissions from the worker if there aren't enough --- node/bft/src/primary.rs | 80 ++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 2ef40b2080..677b18ebda 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -398,43 +398,59 @@ impl Primary { let mut num_transactions = 0; // Take the transmissions from the workers. for worker in self.workers.iter() { - for (id, transmission) in worker.drain(num_transmissions_per_worker) { - // Check if the ledger already contains the transmission. - if self.ledger.contains_transmission(&id).unwrap_or(true) { - trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); - continue; - } - // Check if the previous certificates already contain the transmission. - if previous_certificate_transmissions.contains(&id) { - trace!("Proposing - Skipping transmission '{}' - Already in previous certificates", fmt_id(id)); - continue; + // Initialize a tracker for included transmissions for the current worker. + let mut num_transmissions_included_for_worker = 0; + // Keep draining the worker until the desired number of transmissions is reached or the worker is empty. + 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker { + // Determine the number of remaining transmissions for the worker. + let num_remaining_transmissions = + num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker); + // Drain the worker. + let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable(); + // If the worker is empty, break early. + if worker_transmissions.peek().is_none() { + break 'outer; } - // Check the transmission is still valid. - match (id, transmission.clone()) { - (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { - // Check if the solution is still valid. - if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { - trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); - continue; - } + // Iterate through the worker transmissions. + 'inner: for (id, transmission) in worker_transmissions { + // Check if the ledger already contains the transmission. + if self.ledger.contains_transmission(&id).unwrap_or(true) { + trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); + continue 'inner; } - (TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => { - // Check if the transaction is still valid. - if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { - trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); - continue; + // Check if the previous certificates already contain the transmission. + if previous_certificate_transmissions.contains(&id) { + trace!("Proposing - Skipping transmission '{}' - Already in previous certificates", fmt_id(id)); + continue 'inner; + } + // Check the transmission is still valid. + match (id, transmission.clone()) { + (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { + // Check if the solution is still valid. + if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { + trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + continue 'inner; + } + } + (TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => { + // Check if the transaction is still valid. + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { + trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + continue 'inner; + } + // Increment the number of transactions. + num_transactions += 1; } - // Increment the number of transactions. - num_transactions += 1; + // Note: We explicitly forbid including ratifications, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => continue, + // All other combinations are clearly invalid. + _ => continue 'inner, } - // Note: We explicitly forbid including ratifications, - // as the protocol currently does not support ratifications. - (TransmissionID::Ratification, Transmission::Ratification) => continue, - // All other combinations are clearly invalid. - _ => continue, + // Insert the transmission into the map. + transmissions.insert(id, transmission); + num_transmissions_included_for_worker += 1; } - // Insert the transmission into the map. - transmissions.insert(id, transmission); } } // If there are no unconfirmed transmissions to propose, return early. From c79299d98279fe8377119874d5f92b0c4b5235f8 Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:54:04 -0700 Subject: [PATCH 3/8] Add test for skipping transmission from previous certificates --- node/bft/src/primary.rs | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 677b18ebda..5a5899f4b1 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -1786,6 +1786,72 @@ mod tests { assert!(primary.proposed_batch.read().is_some()); } + #[tokio::test] + async fn test_propose_batch_skip_transmissions_from_previous_certificates() { + let round = 3; + let prev_round = round - 1; + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng).await; + let peer_account = &accounts[1]; + let peer_ip = peer_account.0; + + // Fill primary storage. + store_certificate_chain(&primary, &accounts, round, &mut rng); + + // Get transmissions from previous certificates. + let previous_certificate_ids: IndexSet<_> = + primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect(); + + // Track the number of transmissions in the previous round. + let mut num_transmissions_in_previous_round = 0; + + // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage. + for (_, account) in accounts.iter() { + let (certificate, transmissions) = create_batch_certificate( + account.address(), + &accounts, + round, + previous_certificate_ids.clone(), + &mut rng, + ); + + // Add the transmissions to the worker. + for (transmission_id, transmission) in transmissions.iter() { + primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); + } + + // Insert the certificate to storage. + num_transmissions_in_previous_round += transmissions.len(); + primary.storage.insert_certificate(certificate, transmissions).unwrap(); + } + + // Check that the worker has `num_transmissions_in_previous_round` transmissions. + assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round); + + // Advance to the next round. + assert!(primary.storage.increment_to_next_round(round).is_ok()); + + // Generate a solution and a transaction. + let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); + let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); + + // Store it on one of the workers. + primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); + primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); + + // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions. + assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2); + + // Propose the batch. + assert!(primary.propose_batch().await.is_ok()); + + // Check that the proposal only contains the new transmissions that were not in previous certificates. + let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone(); + assert_eq!(proposed_transmissions.len(), 2); + assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment))); + assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id))); + } + #[tokio::test] async fn test_batch_propose_from_peer() { let mut rng = TestRng::default(); From 1d5bf66373d0205eed05a55737f19b0ac1f13199 Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:06:18 -0700 Subject: [PATCH 4/8] Add new maximum for transmissions sent to memory pool --- node/bft/src/primary.rs | 3 +++ node/consensus/src/lib.rs | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 5a5899f4b1..f524918f34 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -99,6 +99,9 @@ pub struct Primary { } impl Primary { + /// The maximum number of unconfirmed transmissions to send to the primary. + pub const MAX_TRANSMISSIONS_IN_MEMORY_POOL: usize = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * 2; + /// Initializes a new primary instance. pub fn new( account: Account, diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 75b02e89e0..9e76ec452b 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -28,6 +28,7 @@ use snarkos_node_bft::{ Storage as NarwhalStorage, }, spawn_blocking, + Primary, BFT, }; use snarkos_node_bft_ledger_service::LedgerService; @@ -305,13 +306,13 @@ impl Consensus { // If the memory pool of this node is full, return early. let num_unconfirmed = self.num_unconfirmed_transmissions(); - if num_unconfirmed > BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH { + if num_unconfirmed > Primary::::MAX_TRANSMISSIONS_IN_MEMORY_POOL { return Ok(()); } // Retrieve the transactions. let transactions = { // Determine the available capacity. - let capacity = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed); + let capacity = Primary::::MAX_TRANSMISSIONS_IN_MEMORY_POOL.saturating_sub(num_unconfirmed); // Acquire the lock on the transactions queue. let mut tx_queue = self.transactions_queue.lock(); // Determine the number of deployments to send. From 2ea4cc7e6d1b13f716ec0a5cf080ed57ad670724 Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:54:23 -0700 Subject: [PATCH 5/8] Keep at least one transmission in the proposal --- node/bft/src/primary.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index f524918f34..16c47d5671 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -363,10 +363,6 @@ impl Primary { let previous_round = round.saturating_sub(1); // Retrieve the previous certificates. let previous_certificates = self.storage.get_certificates_for_round(previous_round); - // Retrieve the transmissions included in the previous certificates. - let previous_certificate_transmissions = cfg_iter!(previous_certificates) - .flat_map(|certificate| certificate.transmission_ids()) - .collect::>(); // Check if the batch is ready to be proposed. // Note: The primary starts at round 1, and round 0 contains no certificates, by definition. @@ -421,9 +417,11 @@ impl Primary { trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); continue 'inner; } - // Check if the previous certificates already contain the transmission. - if previous_certificate_transmissions.contains(&id) { - trace!("Proposing - Skipping transmission '{}' - Already in previous certificates", fmt_id(id)); + // Check if the storage already contain the transmission. + // Note: We do not skip if this is the first transmission in the proposal, to ensure that + // the primary does not propose a batch with no transmissions. + if !transmissions.is_empty() && self.storage.contains_transmission(id) { + trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); continue 'inner; } // Check the transmission is still valid. From e074e0e158cf3e7907ee77f5146026a50c2c5ecc Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:56:55 -0700 Subject: [PATCH 6/8] Cleanup test --- node/bft/src/primary.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 16c47d5671..611a3d5c5d 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -1806,6 +1806,17 @@ mod tests { // Track the number of transmissions in the previous round. let mut num_transmissions_in_previous_round = 0; + // Generate a solution and a transaction. + let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); + let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); + + // Store it on one of the workers. + primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); + primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); + + // Check that the worker has 2 transmissions. + assert_eq!(primary.workers[0].num_transmissions(), 2); + // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage. for (_, account) in accounts.iter() { let (certificate, transmissions) = create_batch_certificate( @@ -1826,20 +1837,9 @@ mod tests { primary.storage.insert_certificate(certificate, transmissions).unwrap(); } - // Check that the worker has `num_transmissions_in_previous_round` transmissions. - assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round); - // Advance to the next round. assert!(primary.storage.increment_to_next_round(round).is_ok()); - // Generate a solution and a transaction. - let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); - let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); - - // Store it on one of the workers. - primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); - primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); - // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions. assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2); From 46deaad62cc550fd7adea76e40257b018ebc858c Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:22:26 -0700 Subject: [PATCH 7/8] Update num_unconfirmed checks --- node/bft/src/primary.rs | 2 +- node/consensus/src/lib.rs | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 611a3d5c5d..d954c98a6d 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -100,7 +100,7 @@ pub struct Primary { impl Primary { /// The maximum number of unconfirmed transmissions to send to the primary. - pub const MAX_TRANSMISSIONS_IN_MEMORY_POOL: usize = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * 2; + pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * 2; /// Initializes a new primary instance. pub fn new( diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 9e76ec452b..c4ec813be1 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -239,14 +239,17 @@ impl Consensus { } // If the memory pool of this node is full, return early. - let num_unconfirmed = self.num_unconfirmed_transmissions(); - if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH { + let num_unconfirmed_solutions = self.num_unconfirmed_solutions(); + let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); + if num_unconfirmed_solutions > N::MAX_SOLUTIONS + || num_unconfirmed_transmissions > Primary::::MAX_TRANSMISSIONS_TOLERANCE + { return Ok(()); } // Retrieve the solutions. let solutions = { // Determine the available capacity. - let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed); + let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions); // Acquire the lock on the queue. let mut queue = self.solutions_queue.lock(); // Determine the number of solutions to send. @@ -305,14 +308,14 @@ impl Consensus { } // If the memory pool of this node is full, return early. - let num_unconfirmed = self.num_unconfirmed_transmissions(); - if num_unconfirmed > Primary::::MAX_TRANSMISSIONS_IN_MEMORY_POOL { + let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); + if num_unconfirmed_transmissions > Primary::::MAX_TRANSMISSIONS_TOLERANCE { return Ok(()); } // Retrieve the transactions. let transactions = { // Determine the available capacity. - let capacity = Primary::::MAX_TRANSMISSIONS_IN_MEMORY_POOL.saturating_sub(num_unconfirmed); + let capacity = Primary::::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions); // Acquire the lock on the transactions queue. let mut tx_queue = self.transactions_queue.lock(); // Determine the number of deployments to send. From da996bd81bb6c234ce8e4c9eb35b3aebddac94b6 Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:24:52 -0700 Subject: [PATCH 8/8] nit --- node/consensus/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index c4ec813be1..7fcab5b00e 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -241,8 +241,8 @@ impl Consensus { // If the memory pool of this node is full, return early. let num_unconfirmed_solutions = self.num_unconfirmed_solutions(); let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); - if num_unconfirmed_solutions > N::MAX_SOLUTIONS - || num_unconfirmed_transmissions > Primary::::MAX_TRANSMISSIONS_TOLERANCE + if num_unconfirmed_solutions >= N::MAX_SOLUTIONS + || num_unconfirmed_transmissions >= Primary::::MAX_TRANSMISSIONS_TOLERANCE { return Ok(()); } @@ -309,7 +309,7 @@ impl Consensus { // If the memory pool of this node is full, return early. let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); - if num_unconfirmed_transmissions > Primary::::MAX_TRANSMISSIONS_TOLERANCE { + if num_unconfirmed_transmissions >= Primary::::MAX_TRANSMISSIONS_TOLERANCE { return Ok(()); } // Retrieve the transactions.