diff --git a/narwhal/README.md b/narwhal/README.md index b26cd81052631..f4f83b295f84c 100644 --- a/narwhal/README.md +++ b/narwhal/README.md @@ -1,60 +1,9 @@ -# Narwhal and Tusk +# Narwhal HotStuff -[![build status](https://img.shields.io/github/workflow/status/facebookresearch/narwhal/Rust/master?style=flat-square&logo=github)](https://github.com/facebookresearch/narwhal/actions) [![rustc](https://img.shields.io/badge/rustc-1.51+-blue?style=flat-square&logo=rust)](https://www.rust-lang.org) [![license](https://img.shields.io/badge/license-Apache-blue.svg?style=flat-square)](LICENSE) -This repo provides an implementation of [Narwhal and Tusk](https://arxiv.org/pdf/2105.11827.pdf). The codebase has been designed to be small, efficient, and easy to benchmark and modify. It has not been designed to run in production but uses real cryptography ([dalek](https://doc.dalek.rs/ed25519_dalek)), networking ([tokio](https://docs.rs/tokio)), and storage ([rocksdb](https://docs.rs/rocksdb)). - -## Quick Start -The core protocols are written in Rust, but all benchmarking scripts are written in Python and run with [Fabric](http://www.fabfile.org/). -To deploy and benchmark a testbed of 4 nodes on your local machine, clone the repo and install the python dependencies: -``` -$ git clone https://github.com/facebookresearch/narwhal.git -$ cd narwhal/benchmark -$ pip install -r requirements.txt -``` -You also need to install Clang (required by rocksdb) and [tmux](https://linuxize.com/post/getting-started-with-tmux/#installing-tmux) (which runs all nodes and clients in the background). Finally, run a local benchmark using fabric: -``` -$ fab local -``` -This command may take a long time the first time you run it (compiling rust code in `release` mode may be slow) and you can customize a number of benchmark parameters in `fabfile.py`. When the benchmark terminates, it displays a summary of the execution similarly to the one below. -``` ------------------------------------------ - SUMMARY: ------------------------------------------ - + CONFIG: - Faults: 0 node(s) - Committee size: 4 node(s) - Worker(s) per node: 1 worker(s) - Collocate primary and workers: True - Input rate: 50,000 tx/s - Transaction size: 512 B - Execution time: 19 s - - Header size: 1,000 B - Max header delay: 100 ms - GC depth: 50 round(s) - Sync retry delay: 10,000 ms - Sync retry nodes: 3 node(s) - batch size: 500,000 B - Max batch delay: 100 ms - - + RESULTS: - Consensus TPS: 46,478 tx/s - Consensus BPS: 23,796,531 B/s - Consensus latency: 464 ms - - End-to-end TPS: 46,149 tx/s - End-to-end BPS: 23,628,541 B/s - End-to-end latency: 557 ms ------------------------------------------ -``` - -## Next Steps -The next step is to read the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf). It is then recommended to have a look at the README files of the [worker](https://github.com/facebookresearch/narwhal/tree/master/worker) and [primary](https://github.com/facebookresearch/narwhal/tree/master/primary) crates. An additional resource to better understand the Tusk consensus protocol is the paper [All You Need is DAG](https://arxiv.org/abs/2102.08325) as it describes a similar protocol. - -The README file of the [benchmark folder](https://github.com/facebookresearch/narwhal/tree/master/benchmark) explains how to benchmark the codebase and read benchmarks' results. It also provides a step-by-step tutorial to run benchmarks on [Amazon Web Services (AWS)](https://aws.amazon.com) accross multiple data centers (WAN). +The code in this branch is a prototype of Narwhal HotStuff (Hotstuff-over-Narwhal). It supplements the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf) enabling reproducible results. There are no plans to maintain this branch. The [master branch](https://github.com/asonnino/narwhal) contains the most recent and polished version of this codebase. ## License This software is licensed as [Apache 2.0](LICENSE). diff --git a/narwhal/benchmark/benchmark/config.py b/narwhal/benchmark/benchmark/config.py index eb39bb88a8791..bdd03d334b5fe 100644 --- a/narwhal/benchmark/benchmark/config.py +++ b/narwhal/benchmark/benchmark/config.py @@ -25,6 +25,9 @@ class Committee: "authorities: { "name": { "stake": 1, + "consensus: { + "consensus_to_consensus": x.x.x.x:x, + }, "primary: { "primary_to_primary": x.x.x.x:x, "worker_to_primary": x.x.x.x:x, @@ -64,6 +67,11 @@ def __init__(self, addresses, base_port): self.json = {'authorities': OrderedDict()} for name, hosts in addresses.items(): host = hosts.pop(0) + consensus_addr = { + 'consensus_to_consensus': f'{host}:{port}', + } + port += 1 + primary_addr = { 'primary_to_primary': f'{host}:{port}', 'worker_to_primary': f'{host}:{port + 1}' @@ -81,6 +89,7 @@ def __init__(self, addresses, base_port): self.json['authorities'][name] = { 'stake': 1, + 'consensus': consensus_addr, 'primary': primary_addr, 'workers': workers_addr } @@ -115,6 +124,9 @@ def ips(self, name=None): ips = set() for name in names: + addresses = self.json['authorities'][name]['consensus'] + ips.add(self.ip(addresses['consensus_to_consensus'])) + addresses = self.json['authorities'][name]['primary'] ips.add(self.ip(addresses['primary_to_primary'])) ips.add(self.ip(addresses['worker_to_primary'])) @@ -157,7 +169,7 @@ def __init__(self, names, port, workers): assert all(isinstance(x, str) for x in names) assert isinstance(port, int) assert isinstance(workers, int) and workers > 0 - addresses = OrderedDict((x, ['127.0.0.1']*(1+workers)) for x in names) + addresses = OrderedDict((x, ['127.0.0.1']*(2+workers)) for x in names) super().__init__(addresses, port) @@ -165,6 +177,7 @@ class NodeParameters: def __init__(self, json): inputs = [] try: + inputs += [json['timeout_delay']] inputs += [json['header_size']] inputs += [json['max_header_delay']] inputs += [json['gc_depth']] @@ -203,7 +216,6 @@ def __init__(self, json): raise ConfigError('Missing input rate') self.rate = [int(x) for x in rate] - self.workers = int(json['workers']) if 'collocate' in json: @@ -212,7 +224,7 @@ def __init__(self, json): self.collocate = True self.tx_size = int(json['tx_size']) - + self.duration = int(json['duration']) self.runs = int(json['runs']) if 'runs' in json else 1 diff --git a/narwhal/benchmark/benchmark/logs.py b/narwhal/benchmark/benchmark/logs.py index 47867ad3060ac..c3b0671c3db3c 100644 --- a/narwhal/benchmark/benchmark/logs.py +++ b/narwhal/benchmark/benchmark/logs.py @@ -23,7 +23,7 @@ def __init__(self, clients, primaries, workers, faults=0): self.faults = faults if isinstance(faults, int): self.committee_size = len(primaries) + int(faults) - self.workers = len(workers) // len(primaries) + self.workers = len(workers) // len(primaries) else: self.committee_size = '?' self.workers = '?' @@ -107,6 +107,9 @@ def _parse_primaries(self, log): commits = self._merge_results([tmp]) configs = { + 'timeout_delay': int( + search(r'Timeout delay .* (\d+)', log).group(1) + ), 'header_size': int( search(r'Header size .* (\d+)', log).group(1) ), @@ -131,7 +134,7 @@ def _parse_primaries(self, log): } ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) - + return proposals, commits, configs, ip def _parse_workers(self, log): @@ -188,6 +191,7 @@ def _end_to_end_latency(self): return mean(latency) if latency else 0 def result(self): + timeout_delay = self.configs[0]['timeout_delay'] header_size = self.configs[0]['header_size'] max_header_delay = self.configs[0]['max_header_delay'] gc_depth = self.configs[0]['gc_depth'] @@ -215,6 +219,7 @@ def result(self): f' Transaction size: {self.size[0]:,} B\n' f' Execution time: {round(duration):,} s\n' '\n' + f' Timeout delay: {timeout_delay:,} ms\n' f' Header size: {header_size:,} B\n' f' Max header delay: {max_header_delay:,} ms\n' f' GC depth: {gc_depth:,} round(s)\n' diff --git a/narwhal/benchmark/benchmark/plot.py b/narwhal/benchmark/benchmark/plot.py index 3d205111d7524..6f219ff99e3c0 100644 --- a/narwhal/benchmark/benchmark/plot.py +++ b/narwhal/benchmark/benchmark/plot.py @@ -7,7 +7,7 @@ from itertools import cycle from benchmark.utils import PathMaker -from benchmark.config import PlotParameters +from benchmark.config import PlotParameters, ConfigError from benchmark.aggregate import LogAggregator @@ -162,8 +162,8 @@ def plot_tps(cls, files, scalability): def plot(cls, params_dict): try: params = PlotParameters(params_dict) - except PlotError as e: - raise PlotError('Invalid nodes or bench parameters', e) + except ConfigError as e: + raise PlotError(e) # Aggregate the logs. LogAggregator(params.max_latency).print() diff --git a/narwhal/benchmark/fabfile.py b/narwhal/benchmark/fabfile.py index b6425f2681cb7..8758eea04c15e 100644 --- a/narwhal/benchmark/fabfile.py +++ b/narwhal/benchmark/fabfile.py @@ -16,15 +16,16 @@ def local(ctx, debug=True): 'faults': 0, 'nodes': 4, 'workers': 1, - 'rate': 50_000, + 'rate': 10_000, 'tx_size': 512, 'duration': 20, } node_params = { + 'timeout_delay': 500, # ms 'header_size': 1_000, # bytes 'max_header_delay': 200, # ms 'gc_depth': 50, # rounds - 'sync_retry_delay': 10_000, # ms + 'sync_retry_delay': 5_000, # ms 'sync_retry_nodes': 3, # number of nodes 'batch_size': 500_000, # bytes 'max_batch_delay': 200 # ms @@ -37,7 +38,7 @@ def local(ctx, debug=True): @task -def create(ctx, nodes=2): +def create(ctx, nodes=6): ''' Create a testbed''' try: InstanceManager.make().create_instances(nodes) @@ -91,23 +92,24 @@ def install(ctx): @task -def remote(ctx, debug=False): +def remote(ctx, debug=True): ''' Run benchmarks on AWS ''' bench_params = { - 'faults': 3, - 'nodes': [10], - 'workers': 1, - 'collocate': True, - 'rate': [10_000, 110_000], + 'faults': 0, + 'nodes': [4], + 'workers': 4, + 'collocate': False, + 'rate': [50_000], 'tx_size': 512, 'duration': 300, - 'runs': 2, + 'runs': 1, } node_params = { + 'timeout_delay': 5_000, # ms 'header_size': 1_000, # bytes 'max_header_delay': 200, # ms 'gc_depth': 50, # rounds - 'sync_retry_delay': 10_000, # ms + 'sync_retry_delay': 5_000, # ms 'sync_retry_nodes': 3, # number of nodes 'batch_size': 500_000, # bytes 'max_batch_delay': 200 # ms @@ -123,11 +125,11 @@ def plot(ctx): ''' Plot performance using the logs generated by "fab remote" ''' plot_params = { 'faults': [0], - 'nodes': [10, 20, 50], - 'workers': [1], + 'nodes': [4], + 'workers': [1, 4, 7, 10], 'collocate': True, 'tx_size': 512, - 'max_latency': [3_500, 4_500] + 'max_latency': [2_000, 2_500] } try: Ploter.plot(plot_params) diff --git a/narwhal/benchmark/settings.json b/narwhal/benchmark/settings.json index d13629f22b0b5..a8c248f80c499 100644 --- a/narwhal/benchmark/settings.json +++ b/narwhal/benchmark/settings.json @@ -1,13 +1,13 @@ { "key": { - "name": "aws-fb", - "path": "/Users/asonnino/.ssh/aws-fb.pem" + "name": "aws", + "path": "/Users/asonnino/.ssh/aws" }, "port": 5000, "repo": { "name": "narwhal", - "url": "https://github.com/facebookresearch/narwhal", - "branch": "master" + "url": "https://github.com/asonnino/narwhal", + "branch": "narwhal-hs" }, "instances": { "type": "m5d.8xlarge", diff --git a/narwhal/config/src/lib.rs b/narwhal/config/src/lib.rs index b7fed7bb8a7d2..a04fc7537e254 100644 --- a/narwhal/config/src/lib.rs +++ b/narwhal/config/src/lib.rs @@ -60,6 +60,8 @@ pub type WorkerId = u32; #[derive(Deserialize, Clone)] pub struct Parameters { + /// The timeout delay of the consensus protocol. + pub timeout_delay: u64, /// The preferred header size. The primary creates a new header when it has enough parents and /// enough batches' digests to reach `header_size`. Denominated in bytes. pub header_size: usize, @@ -84,6 +86,7 @@ pub struct Parameters { impl Default for Parameters { fn default() -> Self { Self { + timeout_delay: 5_000, header_size: 1_000, max_header_delay: 100, gc_depth: 50, @@ -99,6 +102,8 @@ impl Import for Parameters {} impl Parameters { pub fn log(&self) { + // NOTE: These log entries are needed to compute performance. + info!("Timeout delay set to {} ms", self.timeout_delay); info!("Header size set to {} B", self.header_size); info!("Max header delay set to {} ms", self.max_header_delay); info!("Garbage collection depth set to {} rounds", self.gc_depth); @@ -109,6 +114,12 @@ impl Parameters { } } +#[derive(Clone, Deserialize)] +pub struct ConsensusAddresses { + /// Address to receive messages from other consensus nodes (WAN). + pub consensus_to_consensus: SocketAddr, +} + #[derive(Clone, Deserialize)] pub struct PrimaryAddresses { /// Address to receive messages from other primaries (WAN). @@ -131,6 +142,8 @@ pub struct WorkerAddresses { pub struct Authority { /// The voting power of this authority. pub stake: Stake, + /// The network addresses of the consensus protocol. + pub consensus: ConsensusAddresses, /// The network addresses of the primary. pub primary: PrimaryAddresses, /// Map of workers' id and their network addresses. @@ -180,6 +193,23 @@ impl Committee { (total_votes + 2) / 3 } + /// Returns the consensus addresses of the target consensus node. + pub fn consensus(&self, to: &PublicKey) -> Result { + self.authorities + .get(to) + .map(|x| x.consensus.clone()) + .ok_or_else(|| ConfigError::NotInCommittee(*to)) + } + + /// Returns the addresses of all consensus nodes except `myself`. + pub fn others_consensus(&self, myself: &PublicKey) -> Vec<(PublicKey, ConsensusAddresses)> { + self.authorities + .iter() + .filter(|(name, _)| name != &myself) + .map(|(name, authority)| (*name, authority.consensus.clone())) + .collect() + } + /// Returns the primary addresses of the target primary. pub fn primary(&self, to: &PublicKey) -> Result { self.authorities diff --git a/narwhal/node/Cargo.toml b/narwhal/node/Cargo.toml index 3a14aa2811c88..82f2d68edf134 100644 --- a/narwhal/node/Cargo.toml +++ b/narwhal/node/Cargo.toml @@ -21,10 +21,10 @@ store = { path = "../store" } crypto = { path = "../crypto" } primary = { path = "../primary" } worker = { path = "../worker" } -consensus = { path = "../consensus" } +hotstuff = { path = "../hotstuff" } [features] -benchmark = ["worker/benchmark", "primary/benchmark", "consensus/benchmark"] +benchmark = ["worker/benchmark", "primary/benchmark", "hotstuff/benchmark"] [[bin]] name = "benchmark_client" diff --git a/narwhal/node/src/main.rs b/narwhal/node/src/main.rs index e0e21f14259ce..e271d3f517e04 100644 --- a/narwhal/node/src/main.rs +++ b/narwhal/node/src/main.rs @@ -4,9 +4,10 @@ use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand}; use config::Export as _; use config::Import as _; use config::{Committee, KeyPair, Parameters, WorkerId}; -use consensus::Consensus; +use crypto::SignatureService; use env_logger::Env; -use primary::{Certificate, Primary}; +use hotstuff::{Block, Consensus}; +use primary::Primary; use store::Store; use tokio::sync::mpsc::{channel, Receiver}; use worker::Worker; @@ -74,6 +75,7 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { // Read the committee and node's keypair from file. let keypair = KeyPair::import(key_file).context("Failed to load the node's keypair")?; + let name = keypair.name; let committee = Committee::import(committee_file).context("Failed to load the committee information")?; @@ -85,6 +87,9 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { None => Parameters::default(), }; + // The `SignatureService` provides signatures on input digests. + let signature_service = SignatureService::new(keypair.secret); + // Make the data store. let store = Store::new(store_path).context("Failed to create a store")?; @@ -98,18 +103,22 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); Primary::spawn( - keypair, + name, committee.clone(), parameters.clone(), - store, + signature_service.clone(), + store.clone(), /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, ); Consensus::spawn( + name, committee, - parameters.gc_depth, - /* rx_primary */ rx_new_certificates, - /* tx_primary */ tx_feedback, + parameters, + signature_service, + store, + /* rx_mempool */ rx_new_certificates, + /* tx_mempool */ tx_feedback, tx_output, ); } @@ -134,8 +143,8 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { } /// Receives an ordered list of certificates and apply any application-specific logic. -async fn analyze(mut rx_output: Receiver) { - while let Some(_certificate) = rx_output.recv().await { +async fn analyze(mut rx_output: Receiver) { + while let Some(_block) = rx_output.recv().await { // NOTE: Here goes the application logic. } } diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 461c1ff5a824d..aee76c5526d2c 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -306,7 +306,7 @@ impl Core { fn sanitize_header(&mut self, header: &Header) -> DagResult<()> { ensure!( self.gc_round <= header.round, - DagError::TooOld(header.id.clone(), header.round) + DagError::HeaderTooOld(header.id.clone(), header.round) ); // Verify the header's signature. @@ -320,7 +320,7 @@ impl Core { fn sanitize_vote(&mut self, vote: &Vote) -> DagResult<()> { ensure!( self.current_header.round <= vote.round, - DagError::TooOld(vote.digest(), vote.round) + DagError::VoteTooOld(vote.digest(), vote.round) ); // Ensure we receive a vote on the expected header. @@ -338,7 +338,7 @@ impl Core { fn sanitize_certificate(&mut self, certificate: &Certificate) -> DagResult<()> { ensure!( self.gc_round <= certificate.round(), - DagError::TooOld(certificate.digest(), certificate.round()) + DagError::CertificateTooOld(certificate.digest(), certificate.round()) ); // Verify the certificate (and the embedded header). @@ -393,7 +393,9 @@ impl Core { error!("{}", e); panic!("Storage failure: killing node."); } - Err(e @ DagError::TooOld(..)) => debug!("{}", e), + Err(e @ DagError::HeaderTooOld(..)) => debug!("{}", e), + Err(e @ DagError::VoteTooOld(..)) => debug!("{}", e), + Err(e @ DagError::CertificateTooOld(..)) => debug!("{}", e), Err(e) => warn!("{}", e), } @@ -406,6 +408,7 @@ impl Core { self.certificates_aggregators.retain(|k, _| k >= &gc_round); self.cancel_handlers.retain(|k, _| k >= &gc_round); self.gc_round = gc_round; + debug!("GC round moved to {}", self.gc_round); } } } diff --git a/narwhal/primary/src/error.rs b/narwhal/primary/src/error.rs index a8e228266070d..ed2603ef2a094 100644 --- a/narwhal/primary/src/error.rs +++ b/narwhal/primary/src/error.rs @@ -54,6 +54,12 @@ pub enum DagError { #[error("Parents of header {0} are not a quorum")] HeaderRequiresQuorum(Digest), - #[error("Message {0} (round {1}) too old")] - TooOld(Digest, Round), + #[error("Header {0} (round {1}) too old")] + HeaderTooOld(Digest, Round), + + #[error("Vote {0} (round {1}) too old")] + VoteTooOld(Digest, Round), + + #[error("Certificate {0} (round {1}) too old")] + CertificateTooOld(Digest, Round), } diff --git a/narwhal/primary/src/garbage_collector.rs b/narwhal/primary/src/garbage_collector.rs index 0c0a0352fd757..f2349487bb9de 100644 --- a/narwhal/primary/src/garbage_collector.rs +++ b/narwhal/primary/src/garbage_collector.rs @@ -3,19 +3,25 @@ use crate::messages::Certificate; use crate::primary::PrimaryWorkerMessage; use bytes::Bytes; use config::Committee; +use crypto::Hash as _; use crypto::PublicKey; use network::SimpleSender; use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::mpsc::Receiver; +use store::Store; +use tokio::sync::mpsc::{Receiver, Sender}; /// Receives the highest round reached by consensus and update it for all tasks. pub struct GarbageCollector { + /// The persistent storage. + store: Store, /// The current consensus round (used for cleanup). consensus_round: Arc, /// Receives the ordered certificates from consensus. rx_consensus: Receiver, + /// A loopback channel to the primary's core. + tx_loopback: Sender, /// The network addresses of our workers. addresses: Vec, /// A network sender to notify our workers of cleanup events. @@ -26,8 +32,10 @@ impl GarbageCollector { pub fn spawn( name: &PublicKey, committee: &Committee, + store: Store, consensus_round: Arc, rx_consensus: Receiver, + tx_loopback: Sender, ) { let addresses = committee .our_workers(name) @@ -38,8 +46,10 @@ impl GarbageCollector { tokio::spawn(async move { Self { + store, consensus_round, rx_consensus, + tx_loopback, addresses, network: SimpleSender::new(), } @@ -53,6 +63,21 @@ impl GarbageCollector { while let Some(certificate) = self.rx_consensus.recv().await { // TODO [issue #9]: Re-include batch digests that have not been sequenced into our next block. + // Loop back the certificate from HotStuff in case we haven't seen it. + if self + .store + .read(certificate.digest().to_vec()) + .await + .expect("Failed to read from store") + .is_none() + { + self.tx_loopback + .send(certificate.clone()) + .await + .expect("Failed to loop back certificate to core"); + } + + // Cleanup all the modules. let round = certificate.round(); if round > last_committed_round { last_committed_round = round; diff --git a/narwhal/primary/src/header_waiter.rs b/narwhal/primary/src/header_waiter.rs index ec71942d096f2..8f9f38a500805 100644 --- a/narwhal/primary/src/header_waiter.rs +++ b/narwhal/primary/src/header_waiter.rs @@ -163,7 +163,7 @@ impl HeaderWaiter { } for (worker_id, digests) in requires_sync { let address = self.committee - .worker(&author, &worker_id) + .worker(&self.name, &worker_id) .expect("Author of valid header is not in the committee") .primary_to_worker; let message = PrimaryWorkerMessage::Synchronize(digests, author); @@ -225,6 +225,7 @@ impl HeaderWaiter { Some(result) = waiting.next() => match result { Ok(Some(header)) => { + debug!("Finished synching {:?}", header); let _ = self.pending.remove(&header.id); for x in header.payload.keys() { let _ = self.batch_requests.remove(x); diff --git a/narwhal/primary/src/lib.rs b/narwhal/primary/src/lib.rs index 58b6d125f9a43..9918941f477b1 100644 --- a/narwhal/primary/src/lib.rs +++ b/narwhal/primary/src/lib.rs @@ -17,5 +17,6 @@ mod synchronizer; #[path = "tests/common.rs"] mod common; +pub use crate::error::DagError; pub use crate::messages::{Certificate, Header}; pub use crate::primary::{Primary, PrimaryWorkerMessage, Round, WorkerPrimaryMessage}; diff --git a/narwhal/primary/src/messages.rs b/narwhal/primary/src/messages.rs index 4d04317689b6a..8f58fa2856dac 100644 --- a/narwhal/primary/src/messages.rs +++ b/narwhal/primary/src/messages.rs @@ -237,11 +237,16 @@ impl fmt::Debug for Certificate { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!( f, - "{}: C{}({}, {})", + "{}: C{}({}, {}, {:?})", self.digest(), self.round(), self.origin(), - self.header.id + self.header.id, + self.header + .parents + .iter() + .map(|x| format!("{}", x)) + .collect::>() ) } } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 8ba317fe2741e..8616ac7dc3aa9 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -11,7 +11,7 @@ use crate::proposer::Proposer; use crate::synchronizer::Synchronizer; use async_trait::async_trait; use bytes::Bytes; -use config::{Committee, KeyPair, Parameters, WorkerId}; +use config::{Committee, Parameters, WorkerId}; use crypto::{Digest, PublicKey, SignatureService}; use futures::sink::SinkExt as _; use log::info; @@ -59,9 +59,10 @@ pub struct Primary; impl Primary { pub fn spawn( - keypair: KeyPair, + name: PublicKey, committee: Committee, parameters: Parameters, + signature_service: SignatureService, store: Store, tx_consensus: Sender, rx_consensus: Receiver, @@ -78,12 +79,9 @@ impl Primary { let (tx_cert_requests, rx_cert_requests) = channel(CHANNEL_CAPACITY); // Write the parameters to the logs. + // NOTE: These log entries are needed to compute performance. parameters.log(); - // Parse the public and secret key of this authority. - let name = keypair.name; - let secret = keypair.secret; - // Atomic variable use to synchronizer all tasks with the latest consensus round. This is only // used for cleanup. The only tasks that write into this variable is `GarbageCollector`. let consensus_round = Arc::new(AtomicU64::new(0)); @@ -135,9 +133,6 @@ impl Primary { /* tx_certificate_waiter */ tx_sync_certificates, ); - // The `SignatureService` is used to require signatures on specific digests. - let signature_service = SignatureService::new(secret); - // The `Core` receives and handles headers, votes, and certificates from the other primaries. Core::spawn( name, @@ -156,7 +151,14 @@ impl Primary { ); // Keeps track of the latest consensus round and allows other tasks to clean up their their internal state - GarbageCollector::spawn(&name, &committee, consensus_round.clone(), rx_consensus); + GarbageCollector::spawn( + &name, + &committee, + store.clone(), + consensus_round.clone(), + rx_consensus, + tx_certificates_loopback.clone(), + ); // Receives batch digests from other workers. They are only used to validate headers. PayloadReceiver::spawn(store.clone(), /* rx_workers */ rx_others_digests); diff --git a/narwhal/primary/src/tests/common.rs b/narwhal/primary/src/tests/common.rs index 408b97c16689c..4867c941d52b2 100644 --- a/narwhal/primary/src/tests/common.rs +++ b/narwhal/primary/src/tests/common.rs @@ -1,7 +1,7 @@ // Copyright(C) Facebook, Inc. and its affiliates. use crate::messages::{Certificate, Header, Vote}; use bytes::Bytes; -use config::{Authority, Committee, PrimaryAddresses, WorkerAddresses}; +use config::{Authority, Committee, ConsensusAddresses, PrimaryAddresses, WorkerAddresses}; use crypto::Hash as _; use crypto::{generate_keypair, PublicKey, SecretKey, Signature}; use futures::sink::SinkExt as _; @@ -38,6 +38,9 @@ pub fn committee() -> Committee { .iter() .enumerate() .map(|(i, (id, _))| { + let consensus = ConsensusAddresses { + consensus_to_consensus: format!("127.0.0.1:{}", 0 + i).parse().unwrap(), + }; let primary = PrimaryAddresses { primary_to_primary: format!("127.0.0.1:{}", 100 + i).parse().unwrap(), worker_to_primary: format!("127.0.0.1:{}", 200 + i).parse().unwrap(), @@ -57,6 +60,7 @@ pub fn committee() -> Committee { *id, Authority { stake: 1, + consensus, primary, workers, },