Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Jun 27, 2018
1 parent 3ed2572 commit 2a82a1f
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 53 deletions.
16 changes: 4 additions & 12 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ pub struct Bank {
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
transaction_count: AtomicUsize,

/// The number of Entries the bank has processed without error since start
/// of the ledger, i.e. poor-man's network synchronization
/// TODO: upgrade to U64 when stable?
entry_count: AtomicUsize,
}

impl Bank {
Expand All @@ -105,7 +100,6 @@ impl Bank {
time_sources: RwLock::new(HashSet::new()),
last_time: RwLock::new(Utc.timestamp(0, 0)),
transaction_count: AtomicUsize::new(0),
entry_count: AtomicUsize::new(0),
};
bank.apply_payment(deposit, &mut bank.balances.write().unwrap());
bank
Expand Down Expand Up @@ -302,12 +296,13 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<usize>
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
let mut entry_count = 0;
for entry in entries {
self.entry_count.fetch_add(1, Ordering::Relaxed);
entry_count += 1;

if !entry.transactions.is_empty() {
for result in self.process_transactions(entry.transactions) {
Expand All @@ -321,7 +316,7 @@ impl Bank {
self.register_entry_id(&entry.id);
}
}
Ok(self.entry_count())
Ok(entry_count)
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -435,9 +430,6 @@ impl Bank {
pub fn transaction_count(&self) -> usize {
self.transaction_count.load(Ordering::Relaxed)
}
pub fn entry_count(&self) -> usize {
self.entry_count.load(Ordering::Relaxed)
}
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ fn main() {
bank.register_entry_id(&entry0.id);
bank.register_entry_id(&entry1.id);

// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
eprintln!("processing entries...");
let num_entries = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", num_entries);
let entry_height = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", entry_height);

eprintln!("creating networking stack...");

Expand Down Expand Up @@ -135,6 +137,7 @@ fn main() {
let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr);
let s = Server::new_validator(
bank,
entry_height,
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
Expand All @@ -160,6 +163,7 @@ fn main() {

let server = Server::new_leader(
bank,
entry_height,
//Some(Duration::from_millis(1000)),
None,
repl_data.clone(),
Expand Down
5 changes: 4 additions & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,10 @@ impl Crdt {
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
info!(
"failed RequestWindowIndex {} {} {}",
ix, pos, from.repair_addr
);
}

None
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down
6 changes: 5 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl Server {
/// ```
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
Expand Down Expand Up @@ -89,9 +90,9 @@ impl Server {
exit.clone(),
crdt,
window,
entry_height,
blob_recycler.clone(),
tpu.blob_receiver,
bank.entry_count(),
);
thread_hdls.extend(vec![t_broadcast]);

Expand Down Expand Up @@ -129,6 +130,7 @@ impl Server {
/// ```
pub fn new_validator(
bank: Bank,
entry_height: u64,
me: ReplicatedData,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
Expand Down Expand Up @@ -160,6 +162,7 @@ impl Server {

let tvu = Tvu::new(
bank.clone(),
entry_height,
crdt.clone(),
window.clone(),
replicate_socket,
Expand Down Expand Up @@ -188,6 +191,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let v = Server::new_validator(
bank,
0,
tn.data.clone(),
tn.sockets.requests,
tn.sockets.respond,
Expand Down
70 changes: 36 additions & 34 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;

pub const WINDOW_SIZE: usize = 2 * 1024;
pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
Expand Down Expand Up @@ -148,16 +148,16 @@ pub fn blob_receiver(
fn find_next_missing(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if *received <= *consumed {
return Err(Error::GenericError);
}
let window = locked_window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| {
let i = pix % WINDOW_SIZE;
let i = (pix % WINDOW_SIZE) as usize;
if let &None = &window[i] {
let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val {
Expand All @@ -174,18 +174,18 @@ fn repair_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
last: &mut usize,
last: &mut u64,
times: &mut usize,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<()> {
#[cfg(feature = "erasure")]
{
if erasure::recover(
_recycler,
&mut locked_window.write().unwrap(),
*consumed,
*received,
*consumed as usize,
*received as usize,
).is_err()
{
trace!("erasure::recover failed");
Expand Down Expand Up @@ -217,8 +217,8 @@ fn recv_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
retransmit: &BlobSender,
Expand Down Expand Up @@ -273,7 +273,7 @@ fn recv_window(
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()? as usize, p.meta.size)
(p.get_index()?, p.meta.size)
};
if pix > *received {
*received = pix;
Expand All @@ -287,7 +287,7 @@ fn recv_window(
);
continue;
}
let w = pix % WINDOW_SIZE;
let w = (pix % WINDOW_SIZE) as usize;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
Expand All @@ -304,7 +304,7 @@ fn recv_window(
}
}
loop {
let k = *consumed % WINDOW_SIZE;
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() {
break;
Expand All @@ -330,19 +330,21 @@ fn recv_window(
} else {
#[cfg(feature = "erasure")]
{
let block_start = *consumed - (*consumed % erasure::NUM_CODED);
let coding_end = block_start + erasure::NUM_CODED;
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
let coding_end = block_start + erasure::NUM_CODED as u64;
// We've received all this block's data blobs, go and null out the window now
for j in block_start..*consumed {
if let Some(b) = mem::replace(&mut window[j % WINDOW_SIZE], None) {
if let Some(b) =
mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None)
{
recycler.recycle(b);
}
}
for j in *consumed..coding_end {
window[j % WINDOW_SIZE] = None;
window[(j % WINDOW_SIZE) as usize] = None;
}

*consumed += erasure::MAX_MISSING;
*consumed += erasure::MAX_MISSING as u64;
debug!(
"skipping processing coding blob k: {} consumed: {}",
k, *consumed
Expand All @@ -361,15 +363,15 @@ fn recv_window(
Ok(())
}

fn print_window(locked_window: &Window, consumed: usize) {
fn print_window(locked_window: &Window, consumed: u64) {
{
let buf: Vec<_> = locked_window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.is_none() {
"0"
Expand All @@ -391,25 +393,25 @@ fn print_window(locked_window: &Window, consumed: usize) {
}

pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE]))
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
retransmit: BlobSender,
entry_count: usize,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut consumed = entry_count;
let mut received = entry_count;
let mut last = entry_count;
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
loop {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -459,9 +461,9 @@ fn broadcast(

// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec());
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());

print_window(window, *receive_index as usize);
print_window(window, *receive_index);

for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
Expand All @@ -479,7 +481,7 @@ fn broadcast(
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos], None) {
trace!(
"popped {} at {}",
Expand All @@ -492,7 +494,7 @@ fn broadcast(
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
Expand Down Expand Up @@ -531,15 +533,15 @@ pub fn broadcaster(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
entry_count: usize,
) -> JoinHandle<()> {
Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = entry_count as u64;
let mut receive_index = entry_count as u64;
let mut transmit_index = entry_height;
let mut receive_index = entry_height;
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -825,11 +827,11 @@ mod test {
exit.clone(),
subs,
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
0,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(
Expand Down
2 changes: 2 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down Expand Up @@ -249,6 +250,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down
Loading

0 comments on commit 2a82a1f

Please sign in to comment.