From ccb478c1f6c53066a6419c60cb9fb196d0315713 Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Tue, 1 May 2018 16:22:33 -0400 Subject: [PATCH 1/7] improved error handling and atomic transactions --- src/accountant.rs | 49 +++++++++++++++++++++++++++++++----------- src/accountant_skel.rs | 13 ++++++++--- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 683474efa74606..472cf2f19cd769 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -14,6 +14,7 @@ use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::atomic::{AtomicIsize, Ordering}; use std::result; use std::sync::RwLock; use transaction::Transaction; @@ -23,6 +24,7 @@ pub const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] pub enum AccountingError { AccountNotFound, + BalanceUpdatedBeforeTransactionCompleted, InsufficientFunds, InvalidTransferSignature, } @@ -30,18 +32,18 @@ pub enum AccountingError { pub type Result = result::Result; /// Commit funds to the 'to' party. -fn apply_payment(balances: &RwLock>>, payment: &Payment) { +fn apply_payment(balances: &RwLock>, payment: &Payment) { if balances.read().unwrap().contains_key(&payment.to) { let bals = balances.read().unwrap(); - *bals[&payment.to].write().unwrap() += payment.tokens; + bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { let mut bals = balances.write().unwrap(); - bals.insert(payment.to, RwLock::new(payment.tokens)); + bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); } } pub struct Accountant { - balances: RwLock>>, + balances: RwLock>, pending: RwLock>, last_ids: RwLock>)>>, time_sources: RwLock>, @@ -131,23 +133,34 @@ impl Accountant { // Hold a write lock before the condition check, so that a debit can't occur // between checking the balance and the withdraw. let option = bals.get(&tr.from); + if option.is_none() { return Err(AccountingError::AccountNotFound); } - let mut bal = option.unwrap().write().unwrap(); if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) { return Err(AccountingError::InvalidTransferSignature); } - if *bal < tr.data.tokens { + let bal = option.unwrap(); + let current = bal.load(Ordering::Relaxed) as i64; + + if current < tr.data.tokens { self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); return Err(AccountingError::InsufficientFunds); } - *bal -= tr.data.tokens; + let result = bal.compare_exchange( + current as isize, + (current - tr.data.tokens) as isize, + Ordering::Relaxed, + Ordering::Relaxed, + ); - Ok(()) + return match result { + Ok(_) => Ok(()), + Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted), + } } pub fn process_verified_transaction_credits(&self, tr: &Transaction) { @@ -164,9 +177,15 @@ impl Accountant { /// Process a Transaction that has already been verified. pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { - self.process_verified_transaction_debits(tr)?; - self.process_verified_transaction_credits(tr); - Ok(()) + return match self.process_verified_transaction_debits(tr) { + Ok(_) => { + self.process_verified_transaction_credits(tr); + Ok(()) + }, + Err(err) => { + Err(err) + } + }; } /// Process a batch of verified transactions. @@ -174,7 +193,11 @@ impl Accountant { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. let results: Vec<_> = trs.into_par_iter() - .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .filter_map(|tr| match self.process_verified_transaction_debits(&tr) { + Ok(_x) => Some(Ok(tr)), + Err(_e) => None, + }) + // .flat_map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) .collect(); // Calling collect() here forces all debits to complete before moving on. results @@ -300,7 +323,7 @@ impl Accountant { pub fn get_balance(&self, pubkey: &PublicKey) -> Option { let bals = self.balances.read().unwrap(); - bals.get(pubkey).map(|x| *x.read().unwrap()) + bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 9712c6c031cab5..598cc54e58e492 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -757,6 +757,7 @@ mod bench { // Create transactions between unrelated parties. let txs = 100_000; let last_ids: Mutex> = Mutex::new(HashSet::new()); + let errors: Mutex = Mutex::new(0); let transactions: Vec<_> = (0..txs) .into_par_iter() .map(|i| { @@ -774,11 +775,17 @@ mod bench { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - acc.process_verified_transaction(&tr).unwrap(); + // some of these will fail because balance updates before transaction completes + match acc.process_verified_transaction(&tr) { + Ok(_) => (), + Err(_) => *errors.lock().unwrap() += 1, + }; let rando1 = KeyPair::new(); let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - acc.process_verified_transaction(&tr).unwrap(); + // these will fail if the prior transaction does not go through + // but won't typically fail otherwise since the addresses are randomly generated + let _ = acc.process_verified_transaction(&tr); // Finally, return a transaction that's unique Transaction::new(&rando0, rando1.pubkey(), 1, last_id) @@ -803,7 +810,7 @@ mod bench { drop(skel.historian.sender); let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len(), txs as usize); + assert_eq!(entries[0].events.len() + *errors.lock().unwrap(), txs as usize); println!("{} tps", tps); } From cb362e9052acdad9f22aa1893a75b1aa10f8b9b8 Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Tue, 1 May 2018 16:38:08 -0400 Subject: [PATCH 2/7] rust format --- src/accountant.rs | 6 ++---- src/accountant_skel.rs | 9 ++++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 472cf2f19cd769..3be68a8d76be7c 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -160,7 +160,7 @@ impl Accountant { return match result { Ok(_) => Ok(()), Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted), - } + }; } pub fn process_verified_transaction_credits(&self, tr: &Transaction) { @@ -181,10 +181,8 @@ impl Accountant { Ok(_) => { self.process_verified_transaction_credits(tr); Ok(()) - }, - Err(err) => { - Err(err) } + Err(err) => Err(err), }; } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 598cc54e58e492..0c31c2b20adc7d 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -777,8 +777,8 @@ mod bench { let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); // some of these will fail because balance updates before transaction completes match acc.process_verified_transaction(&tr) { - Ok(_) => (), - Err(_) => *errors.lock().unwrap() += 1, + Ok(_) => (), + Err(_) => *errors.lock().unwrap() += 1, }; let rando1 = KeyPair::new(); @@ -810,7 +810,10 @@ mod bench { drop(skel.historian.sender); let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len() + *errors.lock().unwrap(), txs as usize); + assert_eq!( + entries[0].events.len() + *errors.lock().unwrap(), + txs as usize + ); println!("{} tps", tps); } From b992a84d67d6e20ca7d1b18e8e68c3d480dfe054 Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Wed, 2 May 2018 10:15:08 -0400 Subject: [PATCH 3/7] modified verification to loop until success or failure --- src/accountant.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 3be68a8d76be7c..1f92ef844ce31c 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -24,7 +24,6 @@ pub const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] pub enum AccountingError { AccountNotFound, - BalanceUpdatedBeforeTransactionCompleted, InsufficientFunds, InvalidTransferSignature, } @@ -142,25 +141,27 @@ impl Accountant { return Err(AccountingError::InvalidTransferSignature); } - let bal = option.unwrap(); - let current = bal.load(Ordering::Relaxed) as i64; + loop { + let bal = option.unwrap(); + let current = bal.load(Ordering::Relaxed) as i64; - if current < tr.data.tokens { - self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); - return Err(AccountingError::InsufficientFunds); - } + if current < tr.data.tokens { + self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); + return Err(AccountingError::InsufficientFunds); + } - let result = bal.compare_exchange( - current as isize, - (current - tr.data.tokens) as isize, - Ordering::Relaxed, - Ordering::Relaxed, - ); + let result = bal.compare_exchange( + current as isize, + (current - tr.data.tokens) as isize, + Ordering::Relaxed, + Ordering::Relaxed, + ); - return match result { - Ok(_) => Ok(()), - Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted), - }; + match result { + Ok(_) => return Ok(()), + Err(_) => continue, + }; + } } pub fn process_verified_transaction_credits(&self, tr: &Transaction) { From 6b45d453b8ef6cf48259a3b68d34a148d4af1984 Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Wed, 2 May 2018 10:44:41 -0400 Subject: [PATCH 4/7] modified verification map --- src/accountant.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 1f92ef844ce31c..735798f9f76dd4 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -128,9 +128,6 @@ impl Accountant { /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { let bals = self.balances.read().unwrap(); - - // Hold a write lock before the condition check, so that a debit can't occur - // between checking the balance and the withdraw. let option = bals.get(&tr.from); if option.is_none() { @@ -178,13 +175,9 @@ impl Accountant { /// Process a Transaction that has already been verified. pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { - return match self.process_verified_transaction_debits(tr) { - Ok(_) => { - self.process_verified_transaction_credits(tr); - Ok(()) - } - Err(err) => Err(err), - }; + self.process_verified_transaction_debits(tr)?; + self.process_verified_transaction_credits(tr); + Ok(()) } /// Process a batch of verified transactions. @@ -192,11 +185,7 @@ impl Accountant { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. let results: Vec<_> = trs.into_par_iter() - .filter_map(|tr| match self.process_verified_transaction_debits(&tr) { - Ok(_x) => Some(Ok(tr)), - Err(_e) => None, - }) - // .flat_map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) .collect(); // Calling collect() here forces all debits to complete before moving on. results From d0151d2b79a2221d73a7a9018d5fa5b905e2b7af Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Wed, 2 May 2018 12:04:05 -0400 Subject: [PATCH 5/7] restored original test logic --- src/accountant_skel.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 0c31c2b20adc7d..c5969fe0974c4c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -757,7 +757,6 @@ mod bench { // Create transactions between unrelated parties. let txs = 100_000; let last_ids: Mutex> = Mutex::new(HashSet::new()); - let errors: Mutex = Mutex::new(0); let transactions: Vec<_> = (0..txs) .into_par_iter() .map(|i| { @@ -775,16 +774,10 @@ mod bench { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - // some of these will fail because balance updates before transaction completes - match acc.process_verified_transaction(&tr) { - Ok(_) => (), - Err(_) => *errors.lock().unwrap() += 1, - }; + let _ = acc.process_verified_transaction(&tr); let rando1 = KeyPair::new(); let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - // these will fail if the prior transaction does not go through - // but won't typically fail otherwise since the addresses are randomly generated let _ = acc.process_verified_transaction(&tr); // Finally, return a transaction that's unique @@ -811,7 +804,7 @@ mod bench { let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1); assert_eq!( - entries[0].events.len() + *errors.lock().unwrap(), + entries[0].events.len(), txs as usize ); From cc6de605ace8adad853cfefd2eba26dd6224940b Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Wed, 2 May 2018 12:21:20 -0400 Subject: [PATCH 6/7] rustfmt --- src/accountant.rs | 2 +- src/accountant_skel.rs | 23 ++++++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 735798f9f76dd4..49c5d2c8838042 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -14,8 +14,8 @@ use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::atomic::{AtomicIsize, Ordering}; use std::result; +use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::RwLock; use transaction::Transaction; diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c5969fe0974c4c..65a0ddd097a4e5 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -487,14 +487,14 @@ mod tests { use std::time::Duration; use transaction::Transaction; - use subscribers::{Node, Subscribers}; - use streamer; - use std::sync::mpsc::channel; - use std::collections::VecDeque; - use hash::{hash, Hash}; - use event::Event; - use entry; use chrono::prelude::*; + use entry; + use event::Event; + use hash::{hash, Hash}; + use std::collections::VecDeque; + use std::sync::mpsc::channel; + use streamer; + use subscribers::{Node, Subscribers}; #[test] fn test_layout() { @@ -774,11 +774,11 @@ mod bench { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - let _ = acc.process_verified_transaction(&tr); + acc.process_verified_transaction(&tr).unwrap(); let rando1 = KeyPair::new(); let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - let _ = acc.process_verified_transaction(&tr); + acc.process_verified_transaction(&tr).unwrap(); // Finally, return a transaction that's unique Transaction::new(&rando0, rando1.pubkey(), 1, last_id) @@ -803,10 +803,7 @@ mod bench { drop(skel.historian.sender); let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1); - assert_eq!( - entries[0].events.len(), - txs as usize - ); + assert_eq!(entries[0].events.len(), txs as usize); println!("{} tps", tps); } From 63cf6363a2424b61c5649e443135b7c3dda5c6bf Mon Sep 17 00:00:00 2001 From: Robert Kelly Date: Wed, 2 May 2018 12:24:25 -0400 Subject: [PATCH 7/7] more rustfmt --- src/packet.rs | 2 +- src/result.rs | 2 +- src/streamer.rs | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/packet.rs b/src/packet.rs index d97b261e9f0fc8..c4b09eb56edd2f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -4,9 +4,9 @@ use result::{Error, Result}; use std::collections::VecDeque; use std::fmt; use std::io; +use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Arc, Mutex, RwLock}; -use std::mem::size_of; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; diff --git a/src/result.rs b/src/result.rs index 01872dfbe1138c..532a64c3b2b9ef 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,10 +1,10 @@ //! The `result` module exposes a Result type that propagates one of many different Error types. +use accountant; use bincode; use serde_json; use std; use std::any::Any; -use accountant; #[derive(Debug)] pub enum Error { diff --git a/src/streamer.rs b/src/streamer.rs index 43e6f2ac353216..7f0e7fbdba2ffd 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -382,8 +382,9 @@ mod test { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, - PacketReceiver}; + use streamer::{ + blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, PacketReceiver, + }; use subscribers::{Node, Subscribers}; fn get_msgs(r: PacketReceiver, num: &mut usize) {