Skip to content

Commit

Permalink
Sleep between events if PoH is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 30, 2018
1 parent a8e1c44 commit 49d705a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 34 deletions.
100 changes: 68 additions & 32 deletions src/record_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use entry::Entry;
use hash::Hash;
use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use transaction::Transaction;
Expand All @@ -27,10 +27,29 @@ pub struct RecordStage {
impl RecordStage {
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(
transaction_receiver: Receiver<Signal>,
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();

let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();

RecordStage {
entry_receiver,
thread_hdl,
}
}

/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
tick_duration: Option<Duration>,
tick_duration: Duration,
) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();
Expand All @@ -39,19 +58,18 @@ impl RecordStage {
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
let start_time = Instant::now();
loop {
if let Err(_) = Self::process_transactions(
if let Err(_) = Self::try_process_signals(
&mut recorder,
duration_data,
&transaction_receiver,
start_time,
tick_duration,
&signal_receiver,
&entry_sender,
) {
return;
}
if duration_data.is_some() {
recorder.hash();
}
recorder.hash();
}
})
.unwrap();
Expand All @@ -62,29 +80,46 @@ impl RecordStage {
}
}

pub fn process_transactions(
fn process_signal(
signal: Signal,
recorder: &mut Recorder,
sender: &Sender<Entry>,
) -> Result<(), ()> {
let txs = if let Signal::Events(txs) = signal {
txs
} else {
vec![]
};
let entry = recorder.record(txs);
sender.send(entry).map_err(|_| ())
}

fn process_signals(
recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some((start_time, tick_duration)) = duration_data {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(RecvError) => return Err(()),
}
}
}

fn try_process_signals(
recorder: &mut Recorder,
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
let entry = recorder.record(vec![]);
sender.send(entry).or(Err(()))?;
}
Signal::Events(transactions) => {
let entry = recorder.record(transactions);
sender.send(entry).or(Err(()))?;
}
},
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
Expand All @@ -104,7 +139,7 @@ mod tests {
fn test_historian() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);

tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
Expand All @@ -130,7 +165,7 @@ mod tests {
fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);
drop(record_stage.entry_receiver);
tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
Expand All @@ -140,7 +175,7 @@ mod tests {
fn test_transactions() {
let (tx_sender, signal_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None);
let record_stage = RecordStage::new(signal_receiver, &zero);
let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
Expand All @@ -153,10 +188,11 @@ mod tests {

#[test]
#[ignore]
fn test_ticking_historian() {
fn test_clock() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20)));
let record_stage =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender);
Expand Down
10 changes: 8 additions & 2 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ impl Tpu {
packet_recycler.clone(),
);

let record_stage =
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration);
let record_stage: RecordStage = match tick_duration {
Some(tick_duration) => RecordStage::new_with_clock(
banking_stage.signal_receiver,
&start_hash,
tick_duration,
),
None => RecordStage::new(banking_stage.signal_receiver, &start_hash),
};

let write_stage = WriteStage::new(
bank.clone(),
Expand Down

0 comments on commit 49d705a

Please sign in to comment.