Skip to content

Commit

Permalink
Add RestartLastVotedForkSlots for wen_restart. (solana-labs#33239)
Browse files Browse the repository at this point in the history
* Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart.

* Fix linter errors.

* Revert RestartHeaviestFork, it will be added in another PR.

* Update frozen abi message.

* Fix wrong number in test generation, change to pub(crate) to limit scope.

* Separate push_epoch_slots and push_restart_last_voted_fork_slots.

* Add RestartLastVotedForkSlots data structure.

* Remove unused parts to make PR smaller.

* Remove unused clone.

* Use CompressedSlotsVec to share code between EpochSlots and RestartLastVotedForkSlots.

* Add total_messages to show how many messages are there.

* Reduce RestartLastVotedForkSlots to one packet (16k slots).

* Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
  • Loading branch information
wen-coding authored Oct 9, 2023
1 parent 55f3f20 commit 0a38108
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 8 deletions.
7 changes: 4 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -393,7 +393,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::AccountsHashes(_) => true,
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _) => {
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
Expand Down Expand Up @@ -4020,7 +4021,7 @@ mod tests {
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone())
.collect();
let self_pubkey = solana_sdk::pubkey::new_rand();
assert!(splits.len() * 3 < NUM_CRDS_VALUES);
assert!(splits.len() * 2 < NUM_CRDS_VALUES);
// Assert that all messages are included in the splits.
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
splits
Expand Down
20 changes: 20 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.counts[10], i64),
("ContactInfo-push", crds_stats.push.counts[11], i64),
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.counts[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.counts[12],
i64
),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.fails[10], i64),
("ContactInfo-push", crds_stats.push.fails[11], i64),
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.fails[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.fails[12],
i64
),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 12];
type CrdsCountsArray = [usize; 13];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -721,6 +721,7 @@ impl CrdsDataStats {
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
156 changes: 153 additions & 3 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::{
cluster_info::MAX_ACCOUNTS_HASHES,
cluster_info::{MAX_ACCOUNTS_HASHES, MAX_CRDS_OBJECT_SIZE},
contact_info::ContactInfo,
deprecated,
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY},
legacy_contact_info::LegacyContactInfo,
},
bincode::{serialize, serialized_size},
Expand Down Expand Up @@ -94,6 +94,7 @@ pub enum CrdsData {
DuplicateShred(DuplicateShredIndex, DuplicateShred),
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Sanitize for CrdsData {
}
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
}
}
}
Expand All @@ -145,7 +147,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..7);
let kind = rng.gen_range(0..8);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -157,6 +159,9 @@ impl CrdsData {
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -485,6 +490,87 @@ impl Sanitize for NodeInstance {
}
}

#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartLastVotedForkSlots {
pub from: Pubkey,
pub wallclock: u64,
pub slots: Vec<CompressedSlots>,
pub last_voted_hash: Hash,
pub shred_version: u16,
}

impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
if self.slots.is_empty() {
return Err(SanitizeError::InvalidValue);
}
self.slots.sanitize()?;
self.last_voted_hash.sanitize()
}
}

impl RestartLastVotedForkSlots {
pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self {
Self {
from,
wallclock: now,
slots: Vec::new(),
last_voted_hash,
shred_version,
}
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let mut result =
RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1);
let num_slots = rng.gen_range(2..20);
let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
.take(num_slots)
.collect::<Vec<Slot>>();
slots.sort();
result.fill(&slots);
result
}

pub fn fill(&mut self, slots: &[Slot]) -> usize {
let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..];
let mut num = 0;
let space = self.max_compressed_slot_size();
if space == 0 {
return 0;
}
while num < slots.len() {
let mut cslot = CompressedSlots::new(space as usize);
num += cslot.add(&slots[num..]);
self.slots.push(cslot);
}
num
}

pub fn deflate(&mut self) {
for s in self.slots.iter_mut() {
let _ = s.deflate();
}
}

pub fn max_compressed_slot_size(&self) -> isize {
let len_header = serialized_size(self).unwrap();
let len_slot = serialized_size(&CompressedSlots::default()).unwrap();
MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize
}

pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
self.slots
.iter()
.filter(|s| min_slot < s.first_slot() + s.num_slots() as u64)
.filter_map(|s| s.to_slots(min_slot).ok())
.flatten()
.collect()
}
}

/// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
Expand All @@ -501,6 +587,7 @@ pub enum CrdsValueLabel {
DuplicateShred(DuplicateShredIndex, Pubkey),
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -524,6 +611,9 @@ impl fmt::Display for CrdsValueLabel {
write!(f, "SnapshotHashes({})", self.pubkey())
}
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
}
}
}
Expand All @@ -543,6 +633,7 @@ impl CrdsValueLabel {
CrdsValueLabel::DuplicateShred(_, p) => *p,
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
}
}
}
Expand Down Expand Up @@ -593,6 +684,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -609,6 +701,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.from,
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -627,6 +720,9 @@ impl CrdsValue {
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down Expand Up @@ -1073,4 +1169,58 @@ mod test {
assert!(node.should_force_push(&pubkey));
assert!(!node.should_force_push(&Pubkey::new_unique()));
}

#[test]
fn test_restart_last_voted_fork_slots() {
let keypair = Keypair::new();
let slot = 53;
let slot_parent = slot - 5;
let shred_version = 21;
let mut slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let original_slots_vec = [slot_parent, slot];
slots.fill(&original_slots_vec);
let value =
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
assert_eq!(value.sanitize(), Ok(()));
let label = value.label();
assert_eq!(
label,
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
);
assert_eq!(label.pubkey(), keypair.pubkey());
assert_eq!(value.wallclock(), slots.wallclock);
let retrived_slots = slots.to_slots(0);
assert_eq!(retrived_slots.len(), 2);
assert_eq!(retrived_slots[0], slot_parent);
assert_eq!(retrived_slots[1], slot);

let empty_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let bad_value =
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair);
assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue));

let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap();
let mut large_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
large_slots.fill(&large_slots_vec);
let retrived_slots = large_slots.to_slots(0);
assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY);
assert_eq!(retrived_slots.first(), Some(&11));
assert_eq!(retrived_slots.last(), Some(&last_slot));
}
}
2 changes: 1 addition & 1 deletion gossip/src/epoch_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl Default for CompressedSlots {
}

impl CompressedSlots {
fn new(max_size: usize) -> Self {
pub(crate) fn new(max_size: usize) -> Self {
CompressedSlots::Uncompressed(Uncompressed::new(max_size))
}

Expand Down

0 comments on commit 0a38108

Please sign in to comment.