Skip to content

Commit

Permalink
reduce locking in propagated check for VoteStateUpdate (solana-labs#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar authored Nov 15, 2023
1 parent eb7e68f commit fb76b4c
Showing 1 changed file with 139 additions and 1 deletion.
140 changes: 139 additions & 1 deletion core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use {
vote_transaction::VoteTransaction,
},
std::{
cmp::max,
collections::{HashMap, HashSet},
iter::repeat,
sync::{
Expand Down Expand Up @@ -499,6 +500,7 @@ impl ClusterInfoVoteListener {
) -> Result<()> {
let mut confirmation_verifier =
OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root());
let mut latest_vote_slot_per_validator = HashMap::new();
let mut last_process_root = Instant::now();
let duplicate_confirmed_slot_sender = Some(duplicate_confirmed_slot_sender);
let mut vote_processing_time = Some(VoteProcessingTiming::default());
Expand Down Expand Up @@ -533,6 +535,7 @@ impl ClusterInfoVoteListener {
&bank_notification_sender,
&duplicate_confirmed_slot_sender,
&mut vote_processing_time,
&mut latest_vote_slot_per_validator,
);
match confirmed_slots {
Ok(confirmed_slots) => {
Expand Down Expand Up @@ -573,6 +576,7 @@ impl ClusterInfoVoteListener {
&None,
&None,
&mut None,
&mut HashMap::new(),
)
}

Expand All @@ -588,6 +592,7 @@ impl ClusterInfoVoteListener {
bank_notification_sender: &Option<BankNotificationSender>,
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
vote_processing_time: &mut Option<VoteProcessingTiming>,
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
) -> Result<ThresholdConfirmedSlots> {
let mut sel = Select::new();
sel.recv(gossip_vote_txs_receiver);
Expand Down Expand Up @@ -617,6 +622,7 @@ impl ClusterInfoVoteListener {
bank_notification_sender,
duplicate_confirmed_slot_sender,
vote_processing_time,
latest_vote_slot_per_validator,
));
}
remaining_wait_time = remaining_wait_time.saturating_sub(start.elapsed());
Expand All @@ -639,13 +645,18 @@ impl ClusterInfoVoteListener {
is_gossip_vote: bool,
bank_notification_sender: &Option<BankNotificationSender>,
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
) {
if vote.is_empty() {
return;
}

let (last_vote_slot, last_vote_hash) = vote.last_voted_slot_hash().unwrap();

let latest_vote_slot = latest_vote_slot_per_validator
.entry(*vote_pubkey)
.or_insert(0);

let root = root_bank.slot();
let mut is_new_vote = false;
let vote_slots = vote.slots();
Expand Down Expand Up @@ -724,6 +735,14 @@ impl ClusterInfoVoteListener {
is_new_vote = is_new;
}

if slot < *latest_vote_slot {
// Important that we filter after the `last_vote_slot` check, as even if this vote
// is old, we still need to track optimistic confirmations.
// However it is fine to filter the rest of the slots for the propagated check tracking below,
// as the propagated check is able to roll up votes for descendants unlike optimistic confirmation.
continue;
}

diff.entry(slot)
.or_default()
.entry(*vote_pubkey)
Expand All @@ -733,6 +752,8 @@ impl ClusterInfoVoteListener {
.or_insert(is_gossip_vote);
}

*latest_vote_slot = max(*latest_vote_slot, last_vote_slot);

if is_new_vote {
subscriptions.notify_vote(*vote_pubkey, vote, vote_transaction_signature);
let _ = verified_vote_sender.send((*vote_pubkey, vote_slots));
Expand All @@ -751,6 +772,7 @@ impl ClusterInfoVoteListener {
bank_notification_sender: &Option<BankNotificationSender>,
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
vote_processing_time: &mut Option<VoteProcessingTiming>,
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
) -> ThresholdConfirmedSlots {
let mut diff: HashMap<Slot, HashMap<Pubkey, bool>> = HashMap::new();
let mut new_optimistic_confirmed_slots = vec![];
Expand All @@ -777,6 +799,7 @@ impl ClusterInfoVoteListener {
is_gossip,
bank_notification_sender,
duplicate_confirmed_slot_sender,
latest_vote_slot_per_validator,
);
}
gossip_vote_txn_processing_time.stop();
Expand Down Expand Up @@ -875,6 +898,7 @@ mod tests {
use {
super::*,
crate::banking_trace::BankingTracer,
itertools::Itertools,
solana_perf::packet,
solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
solana_runtime::{
Expand All @@ -890,7 +914,10 @@ mod tests {
signature::{Keypair, Signature, Signer},
},
solana_vote::vote_sender_types::ReplayVoteSender,
solana_vote_program::{vote_state::Vote, vote_transaction},
solana_vote_program::{
vote_state::{Vote, VoteStateUpdate},
vote_transaction,
},
std::{
collections::BTreeSet,
iter::repeat_with,
Expand Down Expand Up @@ -987,6 +1014,7 @@ mod tests {
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let mut latest_vote_slot_per_validator = HashMap::new();

let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
Expand Down Expand Up @@ -1022,6 +1050,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
)
.unwrap();

Expand Down Expand Up @@ -1054,6 +1083,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
)
.unwrap();

Expand Down Expand Up @@ -1105,6 +1135,7 @@ mod tests {
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let mut latest_vote_slot_per_validator = HashMap::new();

let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
Expand Down Expand Up @@ -1137,6 +1168,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
)
.unwrap();

Expand Down Expand Up @@ -1255,6 +1287,7 @@ mod tests {
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded();
let mut latest_vote_slot_per_validator = HashMap::new();

let mut expected_votes = vec![];
let num_voters_per_slot = 2;
Expand Down Expand Up @@ -1295,6 +1328,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
)
.unwrap();

Expand Down Expand Up @@ -1340,6 +1374,7 @@ mod tests {
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver): (ReplayVoteSender, ReplayVoteReceiver) =
unbounded();
let mut latest_vote_slot_per_validator = HashMap::new();

let vote_slot = 1;
let vote_bank_hash = Hash::default();
Expand Down Expand Up @@ -1396,6 +1431,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
);
}
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
Expand Down Expand Up @@ -1454,6 +1490,7 @@ mod tests {
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
));
let mut latest_vote_slot_per_validator = HashMap::new();

// Send a vote to process, should add a reference to the pubkey for that voter
// in the tracker
Expand Down Expand Up @@ -1489,6 +1526,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
);

// Setup next epoch
Expand Down Expand Up @@ -1536,6 +1574,7 @@ mod tests {
&None,
&None,
&mut None,
&mut latest_vote_slot_per_validator,
);
}

Expand Down Expand Up @@ -1761,4 +1800,103 @@ mod tests {
.previously_sent_to_bank_votes
.is_empty());
}

#[test]
fn test_track_new_votes_filter() {
let validator_keypairs: Vec<_> =
(0..2).map(|_| ValidatorVoteKeypairs::new_rand()).collect();

let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000,
&validator_keypairs,
vec![100; validator_keypairs.len()],
);
let bank = Bank::new_for_tests(&genesis_config);
let exit = Arc::new(AtomicBool::new(false));
let bank_forks = BankForks::new_rw_arc(bank);
let bank = bank_forks.read().unwrap().get(0).unwrap();
let vote_tracker = VoteTracker::default();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
));
let mut latest_vote_slot_per_validator = HashMap::new();

let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let mut diff = HashMap::default();
let mut new_optimistic_confirmed_slots = vec![];

let validator0_keypairs = &validator_keypairs[0];
let (vote_pubkey, vote, _, signature) = vote_parser::parse_vote_transaction(
&vote_transaction::new_vote_state_update_transaction(
VoteStateUpdate::from(vec![(1, 3), (2, 2), (6, 1)]),
Hash::default(),
&validator0_keypairs.node_keypair,
&validator0_keypairs.vote_keypair,
&validator0_keypairs.vote_keypair,
None,
),
)
.unwrap();

ClusterInfoVoteListener::track_new_votes_and_notify_confirmations(
vote,
&vote_pubkey,
signature,
&vote_tracker,
&bank,
&subscriptions,
&verified_vote_sender,
&gossip_verified_vote_hash_sender,
&mut diff,
&mut new_optimistic_confirmed_slots,
true, /* is gossip */
&None,
&None,
&mut latest_vote_slot_per_validator,
);
assert_eq!(diff.keys().copied().sorted().collect_vec(), vec![1, 2, 6]);

// Vote on a new slot, only those later than 6 should show up. 4 is skipped.
diff.clear();
let (vote_pubkey, vote, _, signature) = vote_parser::parse_vote_transaction(
&vote_transaction::new_vote_state_update_transaction(
VoteStateUpdate::from(vec![(1, 6), (2, 5), (3, 4), (4, 3), (7, 2), (8, 1)]),
Hash::default(),
&validator0_keypairs.node_keypair,
&validator0_keypairs.vote_keypair,
&validator0_keypairs.vote_keypair,
None,
),
)
.unwrap();

ClusterInfoVoteListener::track_new_votes_and_notify_confirmations(
vote,
&vote_pubkey,
signature,
&vote_tracker,
&bank,
&subscriptions,
&verified_vote_sender,
&gossip_verified_vote_hash_sender,
&mut diff,
&mut new_optimistic_confirmed_slots,
true, /* is gossip */
&None,
&None,
&mut latest_vote_slot_per_validator,
);
assert_eq!(diff.keys().copied().sorted().collect_vec(), vec![7, 8]);
}
}

0 comments on commit fb76b4c

Please sign in to comment.