Skip to content

Commit

Permalink
pull leader_scheduler out of process_entries()
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Feb 26, 2019
1 parent bc2d4c7 commit cbeca20
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 44 deletions.
46 changes: 6 additions & 40 deletions src/blocktree_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,14 @@ fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec<Result<()>>)]) -> Re
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
/// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available
/// 4. Update the leader scheduler, goto 1
fn par_process_entries_with_scheduler(
bank: &Bank,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut mt_group = vec![];
for entry in entries {
if entry.is_tick() {
// if its a tick, execute the group and register the tick
par_execute_entries(bank, &mt_group)?;
bank.register_tick(&entry.id);
leader_scheduler
.write()
.unwrap()
.update_tick_height(bank.tick_height(), bank);
mt_group = vec![];
continue;
}
Expand All @@ -91,30 +82,8 @@ fn par_process_entries_with_scheduler(
}

/// Process an ordered list of entries.
pub fn process_entries(
bank: &Bank,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
par_process_entries_with_scheduler(bank, entries, leader_scheduler)
}

/// Process an ordered list of entries, populating a circular buffer "tail"
/// as we go.
fn process_block(
bank: &Bank,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
for entry in entries {
process_entry(bank, entry)?;
if entry.is_tick() {
let mut leader_scheduler = leader_scheduler.write().unwrap();
leader_scheduler.update_tick_height(bank.tick_height(), bank);
}
}

Ok(())
pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
par_process_entries(bank, entries)
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -198,11 +167,13 @@ pub fn process_blocktree(
return Err(BankError::LedgerVerificationFailed);
}

process_block(&bank, &entries, &leader_scheduler).map_err(|err| {
process_entries(&bank, &entries).map_err(|err| {
warn!("Failed to process entries for slot {}: {:?}", slot, err);
BankError::LedgerVerificationFailed
})?;

leader_scheduler.write().unwrap().update(&bank);

last_entry_id = entries.last().unwrap().id;
entry_height += entries.len() as u64;
}
Expand Down Expand Up @@ -388,11 +359,6 @@ mod tests {
);
}

fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
par_process_entries_with_scheduler(bank, entries, &leader_scheduler)
}

#[test]
fn test_process_empty_entry_is_registered() {
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
Expand Down
4 changes: 2 additions & 2 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,14 +596,14 @@ mod tests {
}

#[test]
fn test_wrong_role_transition() {
fn test_ledger_role_transition() {
solana_logger::setup();

let mut fullnode_config = FullnodeConfig::default();
let ticks_per_slot = 16;
let slots_per_epoch = 2;
fullnode_config.leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, slots_per_epoch);
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, std::u64::MAX);

// Create the leader and validator nodes
let bootstrap_leader_keypair = Arc::new(Keypair::new());
Expand Down
10 changes: 9 additions & 1 deletion src/leader_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ impl LeaderScheduler {

// Inform the leader scheduler about the current tick height of the cluster. It may generate a
// new schedule as a side-effect.
// TODO: Remove this function, use `update` instead
pub fn update_tick_height(&mut self, tick_height: u64, bank: &Bank) {
let epoch = self.tick_height_to_epoch(tick_height);
trace!(
"update_tick_height: tick_height={} (epoch={})",
"update_tick_height: tick_height={} (epoch={}) bank.id()={}",
tick_height,
epoch,
bank.id(),
);

if tick_height == 0 {
Expand All @@ -218,6 +220,12 @@ impl LeaderScheduler {
}
}

// Update the leader scheduler with the given bank. It may generate a
// new schedule as a side-effect.
pub fn update(&mut self, bank: &Bank) {
self.update_tick_height(bank.tick_height(), bank);
}

pub fn get_leader_for_tick(&self, tick: u64) -> Option<Pubkey> {
self.get_leader_for_slot(self.tick_height_to_slot(tick))
}
Expand Down
4 changes: 3 additions & 1 deletion src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ReplayStage {
// If we don't process the entry now, the for loop will exit and the entry
// will be dropped.
if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() {
res = blocktree_processor::process_entries(bank, &entries[0..=i], leader_scheduler);
res = blocktree_processor::process_entries(bank, &entries[0..=i]);

if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
Expand All @@ -138,6 +138,8 @@ impl ReplayStage {
break;
}

leader_scheduler.write().unwrap().update(&bank);

if 0 == num_ticks_to_next_vote {
subscriptions.notify_subscribers(&bank);
if let Some(voting_keypair) = voting_keypair {
Expand Down

0 comments on commit cbeca20

Please sign in to comment.