Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize windows with last up-to-WINDOW_SIZE blobs #524

Merged
merged 3 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use streamer::WINDOW_SIZE;
use timing::duration_as_us;
use transaction::{Instruction, Plan, Transaction};

Expand Down Expand Up @@ -306,10 +307,7 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<u64> {
let mut entry_count = 0;
for entry in entries {
entry_count += 1;
Expand Down Expand Up @@ -348,7 +346,7 @@ impl Bank {
}

/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<u64>
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
Expand All @@ -364,20 +362,39 @@ impl Bank {
let entry1 = entries
.next()
.expect("invalid ledger: need at least 2 entries");
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");
{
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");

self.apply_payment(&deposit, &mut self.balances.write().unwrap());
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
}
self.register_entry_id(&entry0.id);
self.register_entry_id(&entry1.id);

let mut entry_count = 2;
entry_count += self.process_blocks(entries)?;
Ok(entry_count)
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
let mut next = Vec::with_capacity(WINDOW_SIZE as usize);

for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) {
tail = next;
next = block.collect();
entry_count += self.process_blocks(next.clone())?;
}

tail.append(&mut next);

if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry1);
if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry0);
}
}

Ok((entry_count, tail))
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -483,9 +500,9 @@ mod tests {
use super::*;
use bincode::serialize;
use entry::next_entry;
use entry::Entry;
use entry_writer::{self, EntryWriter};
use hash::hash;
use ledger::next_entries;
use signature::KeyPairUtil;
use std::io::{BufReader, Cursor, Seek, SeekFrom};

Expand Down Expand Up @@ -720,25 +737,52 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

fn create_sample_block(mint: &Mint) -> impl Iterator<Item = Entry> {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
next_entries(&mint.last_id(), 0, vec![tx]).into_iter()
fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator<Item = Entry> {
let mut entries = Vec::with_capacity(length);
let mut hash = mint.last_id();
let mut cur_hashes = 0;
for _ in 0..length {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false);
entries.push(entry);
}
entries.into_iter()
}

fn create_sample_ledger() -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(2);
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(1 + length as i64);
let genesis = mint.create_entries();
let block = create_sample_block(&mint);
let block = create_sample_block(&mint, length);
(genesis.into_iter().chain(block), mint.pubkey())
}

#[test]
fn test_process_ledger() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a second process_ledger test for when the ledger is longer than the tail? It should probably test that the last Entry.id in the window matches bank.last_id().

assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}

#[test]
fn test_process_ledger_around_window_size() {
let window_size = WINDOW_SIZE as usize;
for entry_count in window_size - 1..window_size + 1 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2);
assert!(tail.len() <= window_size);
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}
}

// Write the given entries to a file and then return a file iterator to them.
Expand All @@ -753,7 +797,7 @@ mod tests {

#[test]
fn test_process_ledger_from_file() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let ledger = to_file_iter(ledger);

let bank = Bank::default();
Expand All @@ -765,7 +809,7 @@ mod tests {
fn test_process_ledger_from_files() {
let mint = Mint::new(2);
let genesis = to_file_iter(mint.create_entries().into_iter());
let block = to_file_iter(create_sample_block(&mint));
let block = to_file_iter(create_sample_block(&mint, 1));

let bank = Bank::default();
bank.process_ledger(genesis.chain(block)).unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl Crdt {
//filter myself
false
} else if v.replicate_addr == daddr {
//filter nodes that are not listening
trace!("broadcast skip not listening {:x}", v.debug_id());
false
} else {
trace!("broadcast node {}", v.replicate_addr);
Expand All @@ -400,7 +400,7 @@ impl Crdt {
warn!("crdt too small");
Err(CrdtError::TooSmall)?;
}
trace!("nodes table {}", nodes.len());
trace!("broadcast nodes {}", nodes.len());

// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
Expand All @@ -414,7 +414,7 @@ impl Crdt {
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}

trace!("orders table {}", orders.len());
trace!("broadcast orders table {}", orders.len());
let errs: Vec<_> = orders
.into_iter()
.map(|(b, v)| {
Expand Down Expand Up @@ -471,13 +471,14 @@ impl Crdt {
trace!("skip retransmit to leader {:?}", v.id);
false
} else if v.replicate_addr == daddr {
trace!("skip nodes that are not listening {:?}", v.id);
trace!("retransmit skip not listening {:x}", v.debug_id());
false
} else {
true
}
})
.collect();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
.par_iter()
.map(|v| {
Expand Down Expand Up @@ -757,6 +758,11 @@ impl Crdt {
}

return Some(out);
} else {
info!(
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
);
}
} else {
assert!(window.read().unwrap()[pos].is_none());
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down
44 changes: 39 additions & 5 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData, TestNode};
use entry::Entry;
use entry_writer;
use ledger::Block;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write};
Expand Down Expand Up @@ -51,9 +54,9 @@ impl FullNode {
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let entry_height = bank.process_ledger(entries).expect("process_ledger");

info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
Expand All @@ -74,6 +77,7 @@ impl FullNode {
let server = FullNode::new_validator(
bank,
entry_height,
Some(ledger_tail),
node,
network_entry_point,
exit.clone(),
Expand All @@ -100,6 +104,7 @@ impl FullNode {
let server = FullNode::new_leader(
bank,
entry_height,
Some(ledger_tail),
//Some(Duration::from_millis(1000)),
None,
node,
Expand All @@ -113,6 +118,27 @@ impl FullNode {
server
}
}

fn new_window(
ledger_tail: Option<Vec<Entry>>,
entry_height: u64,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
) -> streamer::Window {
match ledger_tail {
Some(ledger_tail) => {
// convert to blobs
let mut blobs = VecDeque::new();
ledger_tail.to_blobs(&blob_recycler, &mut blobs);

// flatten deque to vec
let blobs: Vec<_> = blobs.into_iter().collect();
streamer::initialized_window(&crdt, blobs, entry_height)
}
None => streamer::default_window(),
}
}

/// Create a server instance acting as a leader.
///
/// ```text
Expand Down Expand Up @@ -140,6 +166,7 @@ impl FullNode {
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
Expand All @@ -166,7 +193,9 @@ impl FullNode {
);
thread_hdls.extend(tpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let window = streamer::default_window();

let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down Expand Up @@ -221,6 +250,7 @@ impl FullNode {
pub fn new_validator(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
node: TestNode,
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
Expand All @@ -239,7 +269,11 @@ impl FullNode {
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
let window = streamer::default_window();

let blob_recycler = BlobRecycler::default();

let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down Expand Up @@ -292,7 +326,7 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone());
let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone());
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
Expand Down
38 changes: 38 additions & 0 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,44 @@ pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

/// Initialize a rebroadcast window with most recent Entry blobs

This comment was marked as resolved.

/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
crdt: &Arc<RwLock<Crdt>>,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> Window {
let window = default_window();

{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());

debug!(
"initialized window entry_height:{} blobs_len:{}",
entry_height,
blobs.len()
);

// Index the blobs
let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window");

// populate the window, offset by implied index
for b in blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}

window
}

pub fn window(
crdt: Arc<RwLock<Crdt>>,
window: Window,
Expand Down
Loading