Skip to content

Commit

Permalink
support an initial window filled with last up-to-WINDOW_SIZE blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Jul 9, 2018
1 parent 71f05cb commit 2577467
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 36 deletions.
98 changes: 71 additions & 27 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use streamer::WINDOW_SIZE;
use timing::duration_as_us;
use transaction::{Instruction, Plan, Transaction};

Expand Down Expand Up @@ -306,10 +307,7 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<u64> {
let mut entry_count = 0;
for entry in entries {
entry_count += 1;
Expand Down Expand Up @@ -348,7 +346,7 @@ impl Bank {
}

/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<u64>
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
Expand All @@ -364,20 +362,39 @@ impl Bank {
let entry1 = entries
.next()
.expect("invalid ledger: need at least 2 entries");
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");
{
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");

self.apply_payment(&deposit, &mut self.balances.write().unwrap());
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
}
self.register_entry_id(&entry0.id);
self.register_entry_id(&entry1.id);

let mut entry_count = 2;
entry_count += self.process_blocks(entries)?;
Ok(entry_count)
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
let mut next = Vec::with_capacity(WINDOW_SIZE as usize);

for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) {
tail = next;
next = block.collect();
entry_count += self.process_blocks(next.clone())?;
}

tail.append(&mut next);

if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry1);
if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry0);
}
}

Ok((entry_count, tail))
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -483,9 +500,9 @@ mod tests {
use super::*;
use bincode::serialize;
use entry::next_entry;
use entry::Entry;
use entry_writer::{self, EntryWriter};
use hash::hash;
use ledger::next_entries;
use signature::KeyPairUtil;
use std::io::{BufReader, Cursor, Seek, SeekFrom};

Expand Down Expand Up @@ -720,25 +737,52 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

fn create_sample_block(mint: &Mint) -> impl Iterator<Item = Entry> {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
next_entries(&mint.last_id(), 0, vec![tx]).into_iter()
fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator<Item = Entry> {
let mut entries = Vec::with_capacity(length);
let mut hash = mint.last_id();
let mut cur_hashes = 0;
for _ in 0..length {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false);
entries.push(entry);
}
entries.into_iter()
}

fn create_sample_ledger() -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(2);
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(1 + length as i64);
let genesis = mint.create_entries();
let block = create_sample_block(&mint);
let block = create_sample_block(&mint, length);
(genesis.into_iter().chain(block), mint.pubkey())
}

#[test]
fn test_process_ledger() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}

#[test]
fn test_process_ledger_around_window_size() {
let window_size = WINDOW_SIZE as usize;
for entry_count in window_size - 1..window_size + 1 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2);
assert!(tail.len() <= window_size);
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}
}

// Write the given entries to a file and then return a file iterator to them.
Expand All @@ -753,7 +797,7 @@ mod tests {

#[test]
fn test_process_ledger_from_file() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let ledger = to_file_iter(ledger);

let bank = Bank::default();
Expand All @@ -765,7 +809,7 @@ mod tests {
fn test_process_ledger_from_files() {
let mint = Mint::new(2);
let genesis = to_file_iter(mint.create_entries().into_iter());
let block = to_file_iter(create_sample_block(&mint));
let block = to_file_iter(create_sample_block(&mint, 1));

let bank = Bank::default();
bank.process_ledger(genesis.chain(block)).unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl Crdt {
//filter myself
false
} else if v.replicate_addr == daddr {
//filter nodes that are not listening
trace!("broadcast skip not listening {:x}", v.debug_id());
false
} else {
trace!("broadcast node {}", v.replicate_addr);
Expand All @@ -400,7 +400,7 @@ impl Crdt {
warn!("crdt too small");
Err(CrdtError::TooSmall)?;
}
trace!("nodes table {}", nodes.len());
trace!("broadcast nodes {}", nodes.len());

// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
Expand All @@ -414,7 +414,7 @@ impl Crdt {
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}

trace!("orders table {}", orders.len());
trace!("broadcast orders table {}", orders.len());
let errs: Vec<_> = orders
.into_iter()
.map(|(b, v)| {
Expand Down Expand Up @@ -471,13 +471,14 @@ impl Crdt {
trace!("skip retransmit to leader {:?}", v.id);
false
} else if v.replicate_addr == daddr {
trace!("skip nodes that are not listening {:?}", v.id);
trace!("retransmit skip not listening {:x}", v.debug_id());
false
} else {
true
}
})
.collect();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
.par_iter()
.map(|v| {
Expand Down Expand Up @@ -757,6 +758,11 @@ impl Crdt {
}

return Some(out);
} else {
info!(
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
);
}
} else {
assert!(window.read().unwrap()[pos].is_none());
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down
44 changes: 39 additions & 5 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData, TestNode};
use entry::Entry;
use entry_writer;
use ledger::Block;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write};
Expand Down Expand Up @@ -51,9 +54,9 @@ impl FullNode {
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let entry_height = bank.process_ledger(entries).expect("process_ledger");

info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
Expand All @@ -74,6 +77,7 @@ impl FullNode {
let server = FullNode::new_validator(
bank,
entry_height,
Some(ledger_tail),
node,
network_entry_point,
exit.clone(),
Expand All @@ -100,6 +104,7 @@ impl FullNode {
let server = FullNode::new_leader(
bank,
entry_height,
Some(ledger_tail),
//Some(Duration::from_millis(1000)),
None,
node,
Expand All @@ -113,6 +118,27 @@ impl FullNode {
server
}
}

fn new_window(
ledger_tail: Option<Vec<Entry>>,
entry_height: u64,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
) -> streamer::Window {
match ledger_tail {
Some(ledger_tail) => {
// convert to blobs
let mut blobs = VecDeque::new();
ledger_tail.to_blobs(&blob_recycler, &mut blobs);

// flatten deque to vec
let blobs: Vec<_> = blobs.into_iter().collect();
streamer::initialized_window(&crdt, blobs, entry_height)
}
None => streamer::default_window(),
}
}

/// Create a server instance acting as a leader.
///
/// ```text
Expand Down Expand Up @@ -140,6 +166,7 @@ impl FullNode {
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
Expand All @@ -166,7 +193,9 @@ impl FullNode {
);
thread_hdls.extend(tpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let window = streamer::default_window();

let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down Expand Up @@ -221,6 +250,7 @@ impl FullNode {
pub fn new_validator(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
node: TestNode,
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
Expand All @@ -239,7 +269,11 @@ impl FullNode {
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
let window = streamer::default_window();

let blob_recycler = BlobRecycler::default();

let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down Expand Up @@ -292,7 +326,7 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone());
let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone());
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
Expand Down
38 changes: 38 additions & 0 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,44 @@ pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

/// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
crdt: &Arc<RwLock<Crdt>>,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> Window {
let window = default_window();

{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());

debug!(
"initialized window entry_height:{} blobs_len:{}",
entry_height,
blobs.len()
);

// Index the blobs
let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window");

// populate the window, offset by implied index
for b in blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}

window
}

pub fn window(
crdt: Arc<RwLock<Crdt>>,
window: Window,
Expand Down
Loading

0 comments on commit 2577467

Please sign in to comment.