Skip to content

Commit

Permalink
replay: do not recheck duplicate confirmation if already confirmed (#…
Browse files Browse the repository at this point in the history
…1237)

* replay: do not recheck duplicate confirmation if already confirmed

* remove supermajority voted as it is not necessary

* pr feedback: renames
  • Loading branch information
AshwinSekar authored Aug 2, 2024
1 parent e32e126 commit 749d6fd
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 119 deletions.
3 changes: 2 additions & 1 deletion core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ impl Tower {
}
}

pub(crate) fn is_slot_confirmed(
#[cfg(test)]
fn is_slot_confirmed(
&self,
slot: Slot,
voted_stakes: &VotedStakes,
Expand Down
10 changes: 5 additions & 5 deletions core/src/consensus/progress_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub struct ForkStats {
pub vote_threshold: Vec<ThresholdDecision>,
pub is_locked_out: bool,
pub voted_stakes: VotedStakes,
pub is_supermajority_confirmed: bool,
pub duplicate_confirmed_hash: Option<Hash>,
pub computed: bool,
pub lockout_intervals: LockoutIntervals,
pub bank_hash: Option<Hash>,
Expand Down Expand Up @@ -368,15 +368,15 @@ impl ProgressMap {
.and_then(|s| s.fork_stats.my_latest_landed_vote)
}

pub fn set_supermajority_confirmed_slot(&mut self, slot: Slot) {
pub fn set_duplicate_confirmed_hash(&mut self, slot: Slot, hash: Hash) {
let slot_progress = self.get_mut(&slot).unwrap();
slot_progress.fork_stats.is_supermajority_confirmed = true;
slot_progress.fork_stats.duplicate_confirmed_hash = Some(hash);
}

pub fn is_supermajority_confirmed(&self, slot: Slot) -> Option<bool> {
pub fn is_duplicate_confirmed(&self, slot: Slot) -> Option<bool> {
self.progress_map
.get(&slot)
.map(|s| s.fork_stats.is_supermajority_confirmed)
.map(|s| s.fork_stats.duplicate_confirmed_hash.is_some())
}

pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option<Slot> {
Expand Down
163 changes: 50 additions & 113 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ enum ForkReplayMode {
Parallel(ThreadPool),
}

#[derive(PartialEq, Eq, Debug)]
enum ConfirmationType {
SupermajorityVoted,
DuplicateConfirmed,
}

enum GenerateVoteTxResult {
// non voting validator, not eligible for refresh
NonVoting,
Expand All @@ -146,31 +140,6 @@ impl GenerateVoteTxResult {
}
}

#[derive(PartialEq, Eq, Debug)]
struct ConfirmedSlot {
slot: Slot,
frozen_hash: Hash,
confirmation_type: ConfirmationType,
}

impl ConfirmedSlot {
fn new_supermajority_voted(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::SupermajorityVoted,
}
}

fn new_duplicate_confirmed_slot(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::DuplicateConfirmed,
}
}
}

// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
Expand Down Expand Up @@ -864,16 +833,16 @@ impl ReplayStage {
let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time");
for slot in newly_computed_slot_stats {
let fork_stats = progress.get_fork_stats(slot).unwrap();
let confirmed_slots = Self::confirm_forks(
let duplicate_confirmed_forks = Self::tower_duplicate_confirmed_forks(
&tower,
&fork_stats.voted_stakes,
fork_stats.total_stake,
&progress,
&bank_forks,
);

Self::mark_slots_confirmed(
&confirmed_slots,
Self::mark_slots_duplicate_confirmed(
&duplicate_confirmed_forks,
&blockstore,
&bank_forks,
&mut progress,
Expand Down Expand Up @@ -4117,8 +4086,8 @@ impl ReplayStage {
}

#[allow(clippy::too_many_arguments)]
fn mark_slots_confirmed(
confirmed_slots: &[ConfirmedSlot],
fn mark_slots_duplicate_confirmed(
confirmed_slots: &[(Slot, Hash)],
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
Expand All @@ -4131,36 +4100,14 @@ impl ReplayStage {
duplicate_confirmed_slots: &mut DuplicateConfirmedSlots,
) {
let root_slot = bank_forks.read().unwrap().root();
for ConfirmedSlot {
slot,
frozen_hash,
confirmation_type,
} in confirmed_slots.iter()
{
if *confirmation_type == ConfirmationType::SupermajorityVoted {
// This case should be guaranteed as false by confirm_forks()
if let Some(false) = progress.is_supermajority_confirmed(*slot) {
// Because supermajority confirmation will iterate through and update the
// subtree in fork choice, only incur this cost if the slot wasn't already
// confirmed
progress.set_supermajority_confirmed_slot(*slot);
// If the slot was confirmed, then it must be frozen. Otherwise, we couldn't
// have replayed any of its descendants and figured out it was confirmed.
assert!(*frozen_hash != Hash::default());
}
}
for (slot, frozen_hash) in confirmed_slots.iter() {
assert!(*frozen_hash != Hash::default());

if *slot <= root_slot {
continue;
}

match confirmation_type {
ConfirmationType::SupermajorityVoted => (),
ConfirmationType::DuplicateConfirmed => (),
#[allow(unreachable_patterns)]
_ => panic!("programmer error"),
}

progress.set_duplicate_confirmed_hash(*slot, *frozen_hash);
if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) {
assert_eq!(prev_hash, *frozen_hash);
// Already processed this signal
Expand All @@ -4187,60 +4134,53 @@ impl ReplayStage {
}
}

fn confirm_forks(
fn tower_duplicate_confirmed_forks(
tower: &Tower,
voted_stakes: &VotedStakes,
total_stake: Stake,
progress: &ProgressMap,
bank_forks: &RwLock<BankForks>,
) -> Vec<ConfirmedSlot> {
let mut confirmed_forks = vec![];
) -> Vec<(Slot, Hash)> {
let mut duplicate_confirmed_forks = vec![];
for (slot, prog) in progress.iter() {
if !prog.fork_stats.is_supermajority_confirmed {
let bank = bank_forks
.read()
.unwrap()
.get(*slot)
.expect("bank in progress must exist in BankForks")
.clone();
let duration = prog
.replay_stats
.read()
.unwrap()
.started
.elapsed()
.as_millis();
if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) {
info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_info!("validator-confirmation", ("duration_ms", duration, i64));
confirmed_forks
.push(ConfirmedSlot::new_supermajority_voted(*slot, bank.hash()));
} else if bank.is_frozen()
&& tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake)
{
info!(
"validator fork duplicate confirmed {} {}ms",
*slot, duration
);
datapoint_info!(
"validator-duplicate-confirmation",
("duration_ms", duration, i64)
);
confirmed_forks.push(ConfirmedSlot::new_duplicate_confirmed_slot(
*slot,
bank.hash(),
));
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
*slot,
duration,
voted_stakes.get(slot)
);
}
if prog.fork_stats.duplicate_confirmed_hash.is_some() {
continue;
}
let bank = bank_forks
.read()
.unwrap()
.get(*slot)
.expect("bank in progress must exist in BankForks");
let duration = prog
.replay_stats
.read()
.unwrap()
.started
.elapsed()
.as_millis();
if !bank.is_frozen() {
continue;
}
if tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake) {
info!(
"validator fork duplicate confirmed {} {}ms",
*slot, duration
);
datapoint_info!(
"validator-duplicate-confirmation",
("duration_ms", duration, i64)
);
duplicate_confirmed_forks.push((*slot, bank.hash()));
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
*slot,
duration,
voted_stakes.get(slot)
);
}
}
confirmed_forks
duplicate_confirmed_forks
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -5552,7 +5492,7 @@ pub(crate) mod tests {
// bank 1, so no slot should be confirmed.
{
let fork_progress = progress.get(&0).unwrap();
let confirmed_forks = ReplayStage::confirm_forks(
let confirmed_forks = ReplayStage::tower_duplicate_confirmed_forks(
&tower,
&fork_progress.fork_stats.voted_stakes,
fork_progress.fork_stats.total_stake,
Expand Down Expand Up @@ -5602,18 +5542,15 @@ pub(crate) mod tests {
assert_eq!(newly_computed, vec![1]);
{
let fork_progress = progress.get(&1).unwrap();
let confirmed_forks = ReplayStage::confirm_forks(
let confirmed_forks = ReplayStage::tower_duplicate_confirmed_forks(
&tower,
&fork_progress.fork_stats.voted_stakes,
fork_progress.fork_stats.total_stake,
&progress,
&bank_forks,
);
// No new stats should have been computed
assert_eq!(
confirmed_forks,
vec![ConfirmedSlot::new_supermajority_voted(0, bank0.hash())]
);
assert_eq!(confirmed_forks, vec![(0, bank0.hash())]);
}

let ancestors = bank_forks.read().unwrap().ancestors();
Expand Down

0 comments on commit 749d6fd

Please sign in to comment.