Skip to content

Commit

Permalink
Merge pull request #165 from rlkelly/126__atomic_balances
Browse files Browse the repository at this point in the history
126  atomic balances
  • Loading branch information
garious authored May 2, 2018
2 parents 83892f0 + 21207f6 commit 47c2317
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
41 changes: 26 additions & 15 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque};
use std::result;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::RwLock;
use transaction::Transaction;

Expand All @@ -30,18 +31,18 @@ pub enum AccountingError {
pub type Result<T> = result::Result<T, AccountingError>;

/// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, RwLock<i64>>>, payment: &Payment) {
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
if balances.read().unwrap().contains_key(&payment.to) {
let bals = balances.read().unwrap();
*bals[&payment.to].write().unwrap() += payment.tokens;
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else {
let mut bals = balances.write().unwrap();
bals.insert(payment.to, RwLock::new(payment.tokens));
bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize));
}
}

pub struct Accountant {
balances: RwLock<HashMap<PublicKey, RwLock<i64>>>,
balances: RwLock<HashMap<PublicKey, AtomicIsize>>,
pending: RwLock<HashMap<Signature, Plan>>,
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
time_sources: RwLock<HashSet<PublicKey>>,
Expand Down Expand Up @@ -127,27 +128,37 @@ impl Accountant {
/// funds and isn't a duplicate.
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
let bals = self.balances.read().unwrap();

// Hold a write lock before the condition check, so that a debit can't occur
// between checking the balance and the withdraw.
let option = bals.get(&tr.from);

if option.is_none() {
return Err(AccountingError::AccountNotFound);
}
let mut bal = option.unwrap().write().unwrap();

if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) {
return Err(AccountingError::InvalidTransferSignature);
}

if *bal < tr.data.tokens {
self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id);
return Err(AccountingError::InsufficientFunds);
}
loop {
let bal = option.unwrap();
let current = bal.load(Ordering::Relaxed) as i64;

if current < tr.data.tokens {
self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id);
return Err(AccountingError::InsufficientFunds);
}

*bal -= tr.data.tokens;
let result = bal.compare_exchange(
current as isize,
(current - tr.data.tokens) as isize,
Ordering::Relaxed,
Ordering::Relaxed,
);

Ok(())
match result {
Ok(_) => return Ok(()),
Err(_) => continue,
};
}
}

pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
Expand Down Expand Up @@ -300,7 +311,7 @@ impl Accountant {

pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self.balances.read().unwrap();
bals.get(pubkey).map(|x| *x.read().unwrap())
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,14 @@ mod tests {
use std::time::Duration;
use transaction::Transaction;

use subscribers::{Node, Subscribers};
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use hash::{hash, Hash};
use event::Event;
use entry;
use chrono::prelude::*;
use entry;
use event::Event;
use hash::{hash, Hash};
use std::collections::VecDeque;
use std::sync::mpsc::channel;
use streamer;
use subscribers::{Node, Subscribers};

#[test]
fn test_layout() {
Expand Down
2 changes: 1 addition & 1 deletion src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use result::{Error, Result};
use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::{Arc, Mutex, RwLock};
use std::mem::size_of;

pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
Expand Down
2 changes: 1 addition & 1 deletion src/result.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.

use accountant;
use bincode;
use serde_json;
use std;
use std::any::Any;
use accountant;

#[derive(Debug)]
pub enum Error {
Expand Down
5 changes: 3 additions & 2 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ mod test {
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use streamer::{
blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, PacketReceiver,
};
use subscribers::{Node, Subscribers};

fn get_msgs(r: PacketReceiver, num: &mut usize) {
Expand Down

0 comments on commit 47c2317

Please sign in to comment.