Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sleep between events if PoH is disabled #286

Merged
merged 1 commit into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions src/record_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use entry::Entry;
use hash::Hash;
use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use transaction::Transaction;
Expand All @@ -27,10 +27,29 @@ pub struct RecordStage {
impl RecordStage {
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(
transaction_receiver: Receiver<Signal>,
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();

let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();

RecordStage {
entry_receiver,
thread_hdl,
}
}

/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
tick_duration: Option<Duration>,
tick_duration: Duration,
) -> Self {
let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();
Expand All @@ -39,19 +58,18 @@ impl RecordStage {
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
let start_time = Instant::now();
loop {
if let Err(_) = Self::process_transactions(
if let Err(_) = Self::try_process_signals(
&mut recorder,
duration_data,
&transaction_receiver,
start_time,
tick_duration,
&signal_receiver,
&entry_sender,
) {
return;
}
if duration_data.is_some() {
recorder.hash();
}
recorder.hash();
}
})
.unwrap();
Expand All @@ -62,29 +80,46 @@ impl RecordStage {
}
}

pub fn process_transactions(
fn process_signal(
signal: Signal,
recorder: &mut Recorder,
sender: &Sender<Entry>,
) -> Result<(), ()> {
let txs = if let Signal::Events(txs) = signal {
txs
} else {
vec![]
};
let entry = recorder.record(txs);
sender.send(entry).map_err(|_| ())
}

fn process_signals(
recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some((start_time, tick_duration)) = duration_data {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.recv() {
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(RecvError) => return Err(()),
}
}
}

fn try_process_signals(
recorder: &mut Recorder,
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
}
match receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
let entry = recorder.record(vec![]);
sender.send(entry).or(Err(()))?;
}
Signal::Events(transactions) => {
let entry = recorder.record(transactions);
sender.send(entry).or(Err(()))?;
}
},
Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
Expand All @@ -104,7 +139,7 @@ mod tests {
fn test_historian() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);

tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
Expand All @@ -130,7 +165,7 @@ mod tests {
fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None);
let record_stage = RecordStage::new(tx_receiver, &zero);
drop(record_stage.entry_receiver);
tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
Expand All @@ -140,7 +175,7 @@ mod tests {
fn test_transactions() {
let (tx_sender, signal_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None);
let record_stage = RecordStage::new(signal_receiver, &zero);
let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
Expand All @@ -153,10 +188,11 @@ mod tests {

#[test]
#[ignore]
fn test_ticking_historian() {
fn test_clock() {
let (tx_sender, tx_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20)));
let record_stage =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender);
Expand Down
10 changes: 8 additions & 2 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ impl Tpu {
packet_recycler.clone(),
);

let record_stage =
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration);
let record_stage = match tick_duration {
Some(tick_duration) => RecordStage::new_with_clock(
banking_stage.signal_receiver,
&start_hash,
tick_duration,
),
None => RecordStage::new(banking_stage.signal_receiver, &start_hash),
};

let write_stage = WriteStage::new(
bank.clone(),
Expand Down