Skip to content

Commit

Permalink
Adds quorum threshold checks before signing
Browse files Browse the repository at this point in the history
  • Loading branch information
howardwu committed Jul 5, 2023
1 parent b9a53df commit b831a5e
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 85 deletions.
12 changes: 9 additions & 3 deletions node/narwhal/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,15 @@ impl<N: Network> Gateway<N> {
let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
Ok(())
}
Event::CertificateRequest(..) | Event::CertificateResponse(..) => {
// Disconnect as the peer is not following the protocol.
bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
Event::CertificateRequest(certificate_request) => {
// Send the certificate request to the primary.
let _ = self.primary_sender().tx_certificate_request.send((peer_ip, certificate_request)).await;
Ok(())
}
Event::CertificateResponse(certificate_response) => {
// Send the certificate response to the primary.
let _ = self.primary_sender().tx_certificate_response.send((peer_ip, certificate_response)).await;
Ok(())
}
Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
// Disconnect as the peer is not following the protocol.
Expand Down
21 changes: 20 additions & 1 deletion node/narwhal/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{BatchPropose, BatchSignature, TransmissionRequest, TransmissionResponse};
use crate::{
BatchPropose,
BatchSignature,
CertificateRequest,
CertificateResponse,
TransmissionRequest,
TransmissionResponse,
};
use snarkvm::{
console::network::*,
ledger::narwhal::{BatchCertificate, Data, TransmissionID},
Expand All @@ -32,26 +39,34 @@ pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver
let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);

let tx_batch_propose = Arc::new(tx_batch_propose);
let tx_batch_signature = Arc::new(tx_batch_signature);
let tx_batch_certified = Arc::new(tx_batch_certified);
let tx_certificate_request = Arc::new(tx_certificate_request);
let tx_certificate_response = Arc::new(tx_certificate_response);
let tx_unconfirmed_solution = Arc::new(tx_unconfirmed_solution);
let tx_unconfirmed_transaction = Arc::new(tx_unconfirmed_transaction);

let sender = PrimarySender {
tx_batch_propose,
tx_batch_signature,
tx_batch_certified,
tx_certificate_request,
tx_certificate_response,
tx_unconfirmed_solution,
tx_unconfirmed_transaction,
};
let receiver = PrimaryReceiver {
rx_batch_propose,
rx_batch_signature,
rx_batch_certified,
rx_certificate_request,
rx_certificate_response,
rx_unconfirmed_solution,
rx_unconfirmed_transaction,
};
Expand All @@ -64,6 +79,8 @@ pub struct PrimarySender<N: Network> {
pub tx_batch_propose: Arc<mpsc::Sender<(SocketAddr, BatchPropose<N>)>>,
pub tx_batch_signature: Arc<mpsc::Sender<(SocketAddr, BatchSignature<N>)>>,
pub tx_batch_certified: Arc<mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>>,
pub tx_certificate_request: Arc<mpsc::Sender<(SocketAddr, CertificateRequest<N>)>>,
pub tx_certificate_response: Arc<mpsc::Sender<(SocketAddr, CertificateResponse<N>)>>,
pub tx_unconfirmed_solution: Arc<mpsc::Sender<(PuzzleCommitment<N>, Data<ProverSolution<N>>)>>,
pub tx_unconfirmed_transaction: Arc<mpsc::Sender<(N::TransactionID, Data<Transaction<N>>)>>,
}
Expand All @@ -73,6 +90,8 @@ pub struct PrimaryReceiver<N: Network> {
pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
pub rx_unconfirmed_solution: mpsc::Receiver<(PuzzleCommitment<N>, Data<ProverSolution<N>>)>,
pub rx_unconfirmed_transaction: mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>)>,
}
Expand Down
16 changes: 16 additions & 0 deletions node/narwhal/src/helpers/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use snarkvm::console::{prelude::*, types::Address};

use indexmap::IndexMap;
use std::collections::HashSet;

#[derive(Clone, Debug, PartialEq)]
pub struct Committee<N: Network> {
Expand Down Expand Up @@ -68,6 +69,21 @@ impl<N: Network> Committee<N> {
self.members.contains_key(&address)
}

/// Returns `true` if the combined stake for the given addresses reaches the quorum threshold.
/// This method takes in a `HashSet` to guarantee that the given addresses are unique.
pub fn is_quorum_threshold_reached(&self, addresses: &HashSet<Address<N>>) -> Result<bool> {
// Compute the combined stake for the given addresses.
let mut stake = 0u64;
for address in addresses {
stake = match stake.checked_add(self.get_stake(*address)) {
Some(stake) => stake,
None => bail!("Overflow when computing combined stake to check quorum threshold"),
};
}
// Return whether the combined stake reaches the quorum threshold.
Ok(stake >= self.quorum_threshold()?)
}

/// Returns the amount of stake for the given address.
pub fn get_stake(&self, address: Address<N>) -> u64 {
self.members.get(&address).copied().unwrap_or_default()
Expand Down
64 changes: 45 additions & 19 deletions node/narwhal/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ impl<N: Network> Storage<N> {
/// This method ensures the following invariants:
/// - The certificate ID does not already exist in storage.
/// - The batch ID does not already exist in storage.
/// - The certificate is well-formed.
/// - All transmissions declared in the certificate exist in storage (up to GC).
/// - All previous certificates declared in the certificate exist in storage (up to GC).
/// - All previous certificates are for the previous round (i.e. round - 1).
/// - The previous certificates reached the quorum threshold (2f+1).
pub fn insert_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
// Retrieve the round.
let round = certificate.round();
Expand All @@ -255,30 +258,53 @@ impl<N: Network> Storage<N> {
}

// TODO (howardwu): Ensure the certificate is well-formed. If not, do not store.
// TODO (howardwu): Ensure the round is within range. If not, do not store.
// TODO (howardwu): Ensure the address is in the committee of the specified round. If not, do not store.
// TODO (howardwu): Ensure I have all of the transmissions. If not, do not store.
// TODO (howardwu): Ensure I have all of the previous certificates. If not, do not store.
// TODO (howardwu): Ensure the previous certificates are for round-1. If not, do not store.
// TODO (howardwu): Ensure the previous certificates have reached 2f+1. If not, do not store.

// Iterate over the transmission IDs.
for transmission_id in certificate.transmission_ids() {
// Ensure storage contains the declared transmission ID.
if !self.contains_transmission(*transmission_id) {
bail!("Missing transmission {transmission_id} for certificate {certificate_id}");
// Retrieve the GC round.
let gc_round = self.gc_round();
// Compute the previous round.
let previous_round = round.saturating_sub(1);

// Check if the previous round is within range of the GC round.
if previous_round > gc_round {
// Ensure the previous round exists in storage.
if !self.contains_round(previous_round) {
bail!("Missing state for the previous round {previous_round} in storage (gc={gc_round})");
}
}

// If the certificate's round is greater than the GC round, ensure the transmissions exists.
if round > gc_round {
// Ensure storage contains all declared transmissions (up to GC).
for transmission_id in certificate.transmission_ids() {
// Ensure storage contains the declared transmission ID.
if !self.contains_transmission(*transmission_id) {
bail!("Missing transmission {transmission_id} for certificate in round {round} (gc={gc_round})");
}
}
}

// // Ensure storage contains all declared previous certificates (up to GC).
// for previous_certificate_id in certificate.previous_certificate_ids() {
// // If the certificate's round is greater than the GC round, ensure the previous certificate exists.
// if round > self.gc_round() {
// if !self.certificates.read().contains_key(previous_certificate_id) {
// bail!("Missing previous certificate {previous_certificate_id} for certificate {certificate_id}");
// }
// }
// }
// If the certificate's *previous* round is greater than the GC round, ensure the previous certificates exists.
if previous_round > gc_round {
// Retrieve the committee for the previous round.
let Some(previous_committee) = self.get_committee_for_round(previous_round) else {
bail!("Missing committee for the previous round {previous_round} in storage (gc={gc_round})");
};
// Ensure storage contains all declared previous certificates (up to GC).
for previous_certificate_id in certificate.previous_certificate_ids() {
// Retrieve the previous certificate.
let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
bail!("Missing previous certificate for certificate in round {round} (gc={gc_round})");
};
// Ensure the previous certificate is for the previous round.
if previous_certificate.round() != previous_round {
bail!(
"Previous certificate for round {previous_round} found in certificate for round {round} (gc={gc_round})"
);
}
}
}

/* Proceed to store the certificate. */

Expand Down
Loading

0 comments on commit b831a5e

Please sign in to comment.