Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Merge pull request #86 from garious/tcp-client
Browse files Browse the repository at this point in the history
Fix up client demo
  • Loading branch information
garious authored Mar 28, 2018
2 parents 27f2901 + 849bced commit 9db42c1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
16 changes: 9 additions & 7 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use accountant::Accountant;
use bincode::{deserialize, serialize, serialize_into};
use bincode::{deserialize, serialize};
use entry::Entry;
use hash::Hash;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::default::Default;
use std::io::Write;
use std::io::{ErrorKind, Write};
use std::net::{TcpListener, TcpStream, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
Expand Down Expand Up @@ -52,12 +52,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
self.last_id = entry.id;
write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();

for mut subscriber in &self.subscribers {
// TODO: Handle errors. If TCP stream is closed, remove it.
serialize_into(subscriber, &entry).unwrap();
}
let buf = serialize(&entry).expect("serialize");
self.subscribers
.retain(|ref mut subscriber| match subscriber.write(&buf) {
Err(err) => err.kind() != ErrorKind::BrokenPipe,
_ => true,
});
}
self.last_id
}
Expand Down
7 changes: 2 additions & 5 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl AccountantStub {
let mut buf = vec![0u8; 65_535];
let mut buf_offset = 0;
let mut found = false;
if let Ok(bytes) = self.stream.read(&mut buf) {
if let Ok(_bytes) = self.stream.read(&mut buf) {
loop {
match deserialize::<Entry>(&buf[buf_offset..]) {
Ok(entry) => {
Expand All @@ -100,10 +100,7 @@ impl AccountantStub {
}
}
}
Err(_) => {
println!("read {} of {} in buf", buf_offset, bytes);
break;
}
Err(_) => break,
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ fn main() {
let mint_pubkey = mint.pubkey();

let socket = UdpSocket::bind(send_addr).unwrap();
let stream = TcpStream::connect(send_addr).unwrap();
let stream = TcpStream::connect(addr).unwrap();
stream.set_nonblocking(true).expect("nonblocking");

let mut acc = AccountantStub::new(addr, socket, stream);
let last_id = acc.get_last_id().unwrap();

let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Initial Balance {}", txs);
let mint_balance = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Initial Balance {}", mint_balance);

println!("Signing transactions...");
let txs = mint_balance;
let now = Instant::now();
let transactions: Vec<_> = (0..txs)
.map(|_| {
Expand Down Expand Up @@ -66,13 +69,15 @@ fn main() {
acc.transfer_signed(tr).unwrap();
}
println!("Waiting for last transaction to be confirmed...",);
acc.wait_on_signature(&sig, &last_id).unwrap();
if txs > 0 {
acc.wait_on_signature(&sig, &last_id).unwrap();
}

let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("Done. {} tps!", tps);
let val = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Final Balance {}", val);
assert_eq!(val, 0);
assert_eq!(val, mint_balance - txs);
}

0 comments on commit 9db42c1

Please sign in to comment.