Skip to content

Commit

Permalink
gossip: process duplicate proofs for merkle root conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Nov 14, 2023
1 parent b741505 commit 730ceb5
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 27 deletions.
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
#[frozen_abi(digest = "CroFF8MTW2fxatwv7ALz9Wde4e9L9yE4L59yebM3XuWe")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down
242 changes: 216 additions & 26 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
},
solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError},
},
Expand All @@ -30,7 +31,7 @@ pub struct DuplicateShred {
pub(crate) wallclock: u64,
pub(crate) slot: Slot,
_unused: u32,
shred_type: ShredType,
_unused_shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
num_chunks: u8,
chunk_index: u8,
Expand Down Expand Up @@ -66,6 +67,8 @@ pub enum Error {
InvalidErasureMetaConflict,
#[error("invalid last index conflict")]
InvalidLastIndexConflict,
#[error("invalid merkle root conflict")]
InvalidMerkleRootConflict,
#[error("invalid signature")]
InvalidSignature,
#[error("invalid size limit")]
Expand All @@ -78,8 +81,6 @@ pub enum Error {
MissingDataChunk,
#[error("(de)serialization error")]
SerializationError(#[from] bincode::Error),
#[error("shred type mismatch")]
ShredTypeMismatch,
#[error("slot mismatch")]
SlotMismatch,
#[error("type conversion error")]
Expand All @@ -90,8 +91,8 @@ pub enum Error {

/// Check that `shred1` and `shred2` indicate a valid duplicate proof
/// - Must be for the same slot
/// - Must have the same `shred_type`
/// - Must both sigverify for the correct leader
/// - Must have a merkle root conflict, otherwise `shred1` and `shred2` must have the same `shred_type`
/// - If `shred1` and `shred2` share the same index they must be not equal
/// - If `shred1` and `shred2` do not share the same index and are data shreds
/// verify that they indicate an index conflict. One of them must be the
Expand All @@ -106,10 +107,6 @@ where
return Err(Error::SlotMismatch);
}

if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}

if let Some(leader_schedule) = leader_schedule {
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
Expand All @@ -118,6 +115,23 @@ where
}
}

// Merkle root conflict check
if let Some((mr1, mr2)) = shred1.merkle_root().zip(shred2.merkle_root()) {
// Hash::default check to exclude legacy shreds
if shred1.fec_set_index() == shred2.fec_set_index()
&& mr1 != mr2
&& mr1 != Hash::default()
&& mr2 != Hash::default()
{
return Ok(());
}
}

if shred1.shred_type() != shred2.shred_type() {
// Only valid proof here is a merkle conflict which was checked above
return Err(Error::InvalidMerkleRootConflict);
}

if shred1.index() == shred2.index() {
if shred1.payload() != shred2.payload() {
return Ok(());
Expand Down Expand Up @@ -164,7 +178,7 @@ where
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
Expand All @@ -184,27 +198,21 @@ where
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}

// Returns a predicate checking if a duplicate-shred chunk matches
// (slot, shred_type) and has valid chunk_index.
fn check_chunk(
slot: Slot,
shred_type: ShredType,
num_chunks: u8,
) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
// the slot and has valid chunk_index.
fn check_chunk(slot: Slot, num_chunks: u8) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
move |dup| {
if dup.slot != slot {
Err(Error::SlotMismatch)
} else if dup.shred_type != shred_type {
Err(Error::ShredTypeMismatch)
} else if dup.num_chunks != num_chunks {
Err(Error::NumChunksMismatch)
} else if dup.chunk_index >= num_chunks {
Expand All @@ -226,13 +234,12 @@ pub(crate) fn into_shreds(
let mut chunks = chunks.into_iter();
let DuplicateShred {
slot,
shred_type,
num_chunks,
chunk_index,
chunk,
..
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
let check_chunk = check_chunk(slot, shred_type, num_chunks);
let check_chunk = check_chunk(slot, num_chunks);
let mut data = HashMap::new();
data.insert(chunk_index, chunk);
for chunk in chunks {
Expand Down Expand Up @@ -260,8 +267,6 @@ pub(crate) fn into_shreds(
let shred2 = Shred::new_from_serialized_shred(proof.shred2)?;
if shred1.slot() != slot || shred2.slot() != slot {
Err(Error::SlotMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else {
check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?;
Ok((shred1, shred2))
Expand Down Expand Up @@ -300,7 +305,7 @@ pub(crate) mod tests {
from: Pubkey::new_unique(),
wallclock: u64::MAX,
slot: Slot::MAX,
shred_type: ShredType::Data,
_unused_shred_type: ShredType::Data,
num_chunks: u8::MAX,
chunk_index: u8::MAX,
chunk: Vec::default(),
Expand Down Expand Up @@ -421,7 +426,7 @@ pub(crate) mod tests {
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
Expand All @@ -437,11 +442,11 @@ pub(crate) mod tests {
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}
Expand Down Expand Up @@ -815,6 +820,14 @@ pub(crate) mod tests {
&leader,
merkle_variant,
);
let coding_shreds_different_merkle_root = new_rand_coding_shreds(
&mut rng,
next_shred_index,
10,
&shredder,
&leader,
merkle_variant,
);
let coding_shreds_bigger = new_rand_coding_shreds(
&mut rng,
next_shred_index,
Expand All @@ -833,10 +846,18 @@ pub(crate) mod tests {
);

// Same fec-set, different index, different erasure meta
let test_cases = vec![
let mut test_cases = vec![
(coding_shreds[0].clone(), coding_shreds_bigger[1].clone()),
(coding_shreds[0].clone(), coding_shreds_smaller[1].clone()),
];
if merkle_variant {
// Same erasure config, different merkle root is still different
// erasure meta
test_cases.push((
coding_shreds[0].clone(),
coding_shreds_different_merkle_root[0].clone(),
));
}
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Expand Down Expand Up @@ -949,4 +970,173 @@ pub(crate) mod tests {
);
}
}

#[test]
fn test_merkle_root_conflict_round_trip() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let (coding_shreds, data_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true,
&shredder,
&leader,
false,
);
let (diff_coding_shreds, diff_data_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true,
&shredder,
&leader,
false,
);

let test_cases = vec![
(data_shreds[0].clone(), diff_data_shreds[1].clone()),
(coding_shreds[0].clone(), diff_coding_shreds[1].clone()),
(data_shreds[0].clone(), diff_coding_shreds[0].clone()),
(coding_shreds[0].clone(), diff_data_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
}

#[test]
fn test_merkle_root_conflict_invalid() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};

let (data_shreds, coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true,
&shredder,
&leader,
true,
);

let (next_data_shreds, next_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index + 1,
next_shred_index + 1,
10,
true,
&shredder,
&leader,
true,
);

let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
false,
&shredder,
&leader,
true,
);

let test_cases = vec![
// Same fec set same merkle root
(coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), coding_shreds[0].clone()),
// Different FEC set different merkle root
(coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), coding_shreds[0].clone()),
// Legacy shreds
(
legacy_coding_shreds[0].clone(),
legacy_data_shreds[0].clone(),
),
(
legacy_data_shreds[0].clone(),
legacy_coding_shreds[0].clone(),
),
// Mix of legacy and merkle
(legacy_coding_shreds[0].clone(), data_shreds[0].clone()),
(coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), coding_shreds[0].clone()),
(data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
// Mix of legacy and merkle with different fec index
(legacy_coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::InvalidMerkleRootConflict
);

let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);

assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::InvalidMerkleRootConflict
);
}
}
}

0 comments on commit 730ceb5

Please sign in to comment.