Skip to content

Commit

Permalink
Merge 3-chain
Browse files Browse the repository at this point in the history
  • Loading branch information
Alberto Sonnino committed May 10, 2021
1 parent 761a2de commit 645f4d3
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 141 deletions.
9 changes: 4 additions & 5 deletions benchmark/aws/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ def _config(self, hosts, node_parameters):

names = [x.name for x in keys]
consensus_addr = [f'{x}:{self.settings.consensus_port}' for x in hosts]
mempool_addr = [f'{x}:{self.settings.mempool_port}' for x in hosts]
front_addr = [f'{x}:{self.settings.front_port}' for x in hosts]
committee = Committee(names, consensus_addr, mempool_addr, front_addr)
mempool_addr = [f'{x}:{self.settings.mempool_port}' for x in hosts]
committee = Committee(names, consensus_addr, front_addr, mempool_addr)
committee.print(PathMaker.committee_file())

node_parameters.print(PathMaker.parameters_file())
Expand Down Expand Up @@ -187,9 +187,8 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
# Filter all faulty nodes from the client addresses (or they will wait
# for the faulty nodes to be online).
committee = Committee.load(PathMaker.committee_file())
addresses = committee.front_addresses()
addresses = [x for x in addresses if any(host in x for host in hosts)]
rate_share = ceil(rate / committee.size())
addresses = [f'{x}:{self.settings.front_port}' for x in hosts]
rate_share = ceil(rate / committee.size()) # Take faults into account.
timeout = node_parameters.timeout_delay
client_logs = [PathMaker.client_log_file(i) for i in range(len(hosts))]
for host, addr, log_file in zip(hosts, addresses, client_logs):
Expand Down
4 changes: 0 additions & 4 deletions benchmark/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ def print(self, filename):
with open(filename, 'w') as f:
dump(self.json, f, indent=4, sort_keys=True)

def front_addresses(self):
authorities = self.json['mempool']['authorities']
return [x['front_address'] for x in authorities.values()]

def size(self):
return len(self.json['consensus']['authorities'])

Expand Down
4 changes: 2 additions & 2 deletions benchmark/benchmark/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run(self, debug=False):
nodes = nodes - self.faults

# Run the clients (they will wait for the nodes to be ready).
addresses = committee.front_addresses()
addresses = committee.front
rate_share = ceil(rate / nodes)
timeout = self.node_parameters.timeout_delay
client_logs = [PathMaker.client_log_file(i) for i in range(nodes)]
Expand Down Expand Up @@ -113,7 +113,7 @@ def run(self, debug=False):

# Parse logs and return the parser.
Print.info('Parsing logs...')
return LogParser.process('./logs')
return LogParser.process('./logs', faults=self.faults)

except (subprocess.SubprocessError, ParseError) as e:
self._kill_nodes()
Expand Down
30 changes: 14 additions & 16 deletions benchmark/benchmark/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, clients, nodes, faults=0):
Print.warn(f'Nodes timed out {self.timeouts:,} time(s)')

def _merge_results(self, input):
# Keep the earliest timestamp.
merged = {}
for x in input:
for k, v in x:
Expand All @@ -78,8 +79,8 @@ def _parse_clients(self, log):

misses = len(findall(r'rate too high', log))

tmp = findall(r'\[(.*Z) .* sample transaction', log)
samples = [self._to_posix(x) for x in tmp]
tmp = findall(r'\[(.*Z) .* sample transaction (\d+)', log)
samples = {int(s): self._to_posix(t) for t, s in tmp}

return size, rate, start, misses, samples

Expand All @@ -98,8 +99,8 @@ def _parse_nodes(self, log):
tmp = findall(r'Payload ([^ ]+) contains (\d+) B', log)
sizes = {d: int(s) for d, s in tmp}

tmp = findall(r'\[(.*Z) .* Payload ([^ ]+) contains (\d+) sample', log)
samples = {d: (int(s), self._to_posix(t)) for t, d, s in tmp}
tmp = findall(r'Payload ([^ ]+) contains sample tx (\d+)', log)
samples = {int(s): d for d, s in tmp}

tmp = findall(r'.* WARN .* Timeout', log)
timeouts = len(tmp)
Expand Down Expand Up @@ -155,16 +156,13 @@ def _end_to_end_throughput(self):

def _end_to_end_latency(self):
latency = []
for sent, data in zip(self.sent_samples, self.received_samples):
sent.sort()

ordered = sorted(list(data.items()), key=lambda x: x[1][1])
commit = []
for digest, (occurrences, _) in ordered:
tmp = self.commits.get(digest)
commit += [tmp] * occurrences

latency += [x - y for x, y in zip(commit, sent) if x is not None]
for sent, received in zip(self.sent_samples, self.received_samples):
for tx_id, batch_id in received.items():
if batch_id in self.commits:
assert tx_id in sent # We receive txs that we sent.
start = sent[tx_id]
end = self.commits[batch_id]
latency += [end-start]
return mean(latency) if latency else 0

def result(self):
Expand Down Expand Up @@ -216,11 +214,11 @@ def process(cls, directory, faults=0):
assert isinstance(directory, str)

clients = []
for filename in glob(join(directory, 'client-*.log')):
for filename in sorted(glob(join(directory, 'client-*.log'))):
with open(filename, 'r') as f:
clients += [f.read()]
nodes = []
for filename in glob(join(directory, 'node-*.log')):
for filename in sorted(glob(join(directory, 'node-*.log'))):
with open(filename, 'r') as f:
nodes += [f.read()]

Expand Down
6 changes: 3 additions & 3 deletions benchmark/benchmark/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import matplotlib.pyplot as plt
from matplotlib.ticker import StrMethodFormatter
from glob import glob
from itertools import cycle

from benchmark.utils import PathMaker
from benchmark.config import PlotParameters
Expand Down Expand Up @@ -54,6 +55,7 @@ def _bps2tps(self, x):

def _plot(self, x_label, y_label, y_axis, z_axis, type):
plt.figure()
markers = cycle(['o', 'v', 's', 'p', 'D', 'P'])
self.results.sort(key=self._natural_keys, reverse=(type == 'tps'))
for result in self.results:
y_values, y_err = y_axis(result)
Expand All @@ -63,10 +65,8 @@ def _plot(self, x_label, y_label, y_axis, z_axis, type):

plt.errorbar(
x_values, y_values, yerr=y_err, # uplims=True, lolims=True,
marker='o', label=z_axis(result), linestyle='dotted'
marker=next(markers), label=z_axis(result), linestyle='dotted'
)
# if type == 'latency':
# plt.yscale('log')

plt.legend(loc='lower center', bbox_to_anchor=(0.5, 1), ncol=2)
plt.xlim(xmin=0)
Expand Down
61 changes: 47 additions & 14 deletions consensus/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use log::{debug, error, info, warn};
use network::NetMessage;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::VecDeque;
use store::Store;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{sleep, Duration};
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct Core {
commit_channel: Sender<Block>,
round: RoundNumber,
last_voted_round: RoundNumber,
last_committed_round: RoundNumber,
high_qc: QC,
timer: Timer,
aggregator: Aggregator,
Expand Down Expand Up @@ -83,6 +85,7 @@ impl Core {
core_channel,
round: 1,
last_voted_round: 0,
last_committed_round: 0,
high_qc: QC::genesis(),
timer,
aggregator,
Expand Down Expand Up @@ -118,6 +121,48 @@ impl Core {
// TODO [issue #15]: Write to storage preferred_round and last_voted_round.
Some(Vote::new(&block, self.name, self.signature_service.clone()).await)
}

async fn commit(&mut self, block: Block) -> ConsensusResult<()> {
if self.last_committed_round >= block.round {
return Ok(());
}

let mut to_commit = VecDeque::new();
to_commit.push_back(block.clone());

// Ensure we commit the entire chain. This is needed after view-change.
let mut parent = block.clone();
while self.last_committed_round + 1 < parent.round {
let ancestor = self
.synchronizer
.get_parent_block(&parent)
.await?
.expect("We should have all the ancestors by now");
to_commit.push_front(ancestor.clone());
parent = ancestor;
}

// Save the last committed block.
self.last_committed_round = block.round;

// Send all the newly committed blocks to the node's application layer.
while let Some(block) = to_commit.pop_back() {
if !block.payload.is_empty() {
info!("Committed {}", block);

#[cfg(feature = "benchmark")]
for x in &block.payload {
// NOTE: This log entry is used to compute performance.
info!("Committed B{}({})", block.round, base64::encode(x));
}
}
debug!("Committed {:?}", block);
if let Err(e) = self.commit_channel.send(block).await {
warn!("Failed to send block through the commit channel: {}", e);
}
}
Ok(())
}
// -- End Safety Module --

// -- Start Pacemaker --
Expand Down Expand Up @@ -285,7 +330,7 @@ impl Core {
// Let's see if we have the last three ancestors of the block, that is:
// b0 <- |qc0; b1| <- |qc1; block|
// If we don't, the synchronizer asks for them to other nodes. It will
// then ensure we process all three ancestors in the correct order, and
// then ensure we process both ancestors in the correct order, and
// finally make us resume processing this block.
let (b0, b1) = match self.synchronizer.get_ancestors(block).await? {
Some(ancestors) => ancestors,
Expand All @@ -301,19 +346,7 @@ impl Core {
// Check if we can commit the head of the 2-chain.
// Note that we commit blocks only if we have all its ancestors.
if b0.round + 1 == b1.round {
if !b0.payload.is_empty() {
info!("Committed {}", b0);

#[cfg(feature = "benchmark")]
for x in &b0.payload {
// NOTE: This log entry is used to compute performance.
info!("Committed B{}({})", b0.round, base64::encode(x));
}
}
debug!("Committed {:?}", b0);
if let Err(e) = self.commit_channel.send(b0.clone()).await {
warn!("Failed to send block through the commit channel: {}", e);
}
self.commit(b0.clone()).await?;
}

// Cleanup the mempool.
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ impl RRLeaderElector {
pub fn get_leader(&self, round: RoundNumber) -> PublicKey {
let mut keys: Vec<_> = self.committee.authorities.keys().cloned().collect();
keys.sort();
keys[(round / 2) as usize % self.committee.size()]
keys[round as usize % self.committee.size()]
}
}
2 changes: 1 addition & 1 deletion consensus/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Synchronizer {
Ok(())
}

async fn get_parent_block(&mut self, block: &Block) -> ConsensusResult<Option<Block>> {
pub async fn get_parent_block(&mut self, block: &Block) -> ConsensusResult<Option<Block>> {
if block.qc == QC::genesis() {
return Ok(Some(Block::genesis()));
}
Expand Down
17 changes: 0 additions & 17 deletions consensus/src/tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,3 @@ async fn end_to_end() {
let blocks = try_join_all(handles).await.unwrap();
assert!(blocks.windows(2).all(|w| w[0] == w[1]));
}

#[tokio::test]
async fn dead_node() {
let mut committee = committee();
committee.increment_base_port(6100);

// Run all nodes but the first.
let leader_elector = LeaderElector::new(committee.clone());
let dead = leader_elector.get_leader(0);
let keys = keys().into_iter().filter(|(x, _)| *x != dead).collect();
let store_path = ".db_test_dead_node";
let handles = spawn_nodes(keys, committee, store_path);

// Ensure all threads terminated correctly.
let blocks = try_join_all(handles).await.unwrap();
assert!(blocks.windows(2).all(|w| w[0] == w[1]));
}
5 changes: 3 additions & 2 deletions consensus/src/tests/core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn generate_proposal() {
#[tokio::test]
async fn commit_block() {
// Get enough distinct leaders to form a quorum.
let leaders = vec![leader_keys(1), leader_keys(2), leader_keys(4)];
let leaders = vec![leader_keys(1), leader_keys(2), leader_keys(3)];
let chain = chain(leaders);

// Run a core instance.
Expand All @@ -162,14 +162,15 @@ async fn commit_block() {
let (tx_core, _rx_network, mut rx_commit) = core(public_key, secret_key, store_path).await;

// Send a the blocks to the core.
let committed = chain[0].clone();
for block in chain {
let message = ConsensusMessage::Propose(block);
tx_core.send(message).await.unwrap();
}

// Ensure the core commits the head.
match rx_commit.recv().await {
Some(b) => assert_eq!(b, Block::genesis()),
Some(b) => assert_eq!(b, committed),
_ => assert!(false),
}
}
Expand Down
21 changes: 15 additions & 6 deletions mempool/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use log::{error, warn};
use network::NetMessage;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[cfg(feature = "benchmark")]
use std::convert::TryInto as _;
use store::Store;
use tokio::sync::mpsc::{Receiver, Sender};

Expand Down Expand Up @@ -103,12 +105,19 @@ impl Core {
info!("Payload {:?} contains {} B", digest, payload.size());

#[cfg(feature = "benchmark")]
if payload.sample_txs > 0 {
// NOTE: This log entry is used to compute performance.
info!(
"Payload {:?} contains {} sample tx(s)",
digest, payload.sample_txs
);
for tx in &payload.transactions {
// Look for sample txs (they all start with 0) and gather their
// txs id (the next 8 bytes).
if tx[0] == 0u8 && tx.len() > 8 {
if let Ok(id) = tx[1..9].try_into() {
// NOTE: This log entry is used to compute performance.
info!(
"Payload {:?} contains sample tx {}",
digest,
u64::from_be_bytes(id)
);
}
}
}

// Store the payload.
Expand Down
5 changes: 0 additions & 5 deletions mempool/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,18 @@ pub struct Payload {
pub transactions: Vec<Transaction>,
pub author: PublicKey,
pub signature: Signature,
#[cfg(feature = "benchmark")]
pub sample_txs: usize,
}

impl Payload {
pub async fn new(
transactions: Vec<Transaction>,
author: PublicKey,
mut signature_service: SignatureService,
#[cfg(feature = "benchmark")] sample_txs: usize,
) -> Self {
let payload = Self {
transactions,
author,
signature: Signature::default(),
#[cfg(feature = "benchmark")]
sample_txs,
};
let signature = signature_service.request_signature(payload.digest()).await;
Self {
Expand Down
Loading

0 comments on commit 645f4d3

Please sign in to comment.