diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index d0d507ae48aba1..8ed51a6653b411 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -821,7 +821,8 @@ pub(super) fn make_shreds_from_data( } } let now = Instant::now(); - let erasure_batch_size = shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK); + let erasure_batch_size = + shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK, is_last_in_slot); let proof_size = get_proof_size(erasure_batch_size); let data_buffer_size = ShredData::capacity(proof_size)?; let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size; @@ -872,7 +873,8 @@ pub(super) fn make_shreds_from_data( let data_buffer_size = ShredData::capacity(proof_size).ok()?; let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size; let num_data_shreds = num_data_shreds.max(1); - let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); + let erasure_batch_size = + shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); (proof_size == get_proof_size(erasure_batch_size)) .then_some((proof_size, data_buffer_size)) }) @@ -932,7 +934,8 @@ pub(super) fn make_shreds_from_data( .scan(next_code_index, |next_code_index, chunk| { let out = Some(*next_code_index); let num_data_shreds = chunk.len(); - let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); + let erasure_batch_size = + shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); let num_coding_shreds = erasure_batch_size - num_data_shreds; *next_code_index += num_coding_shreds as u32; out @@ -945,7 +948,13 @@ pub(super) fn make_shreds_from_data( .into_iter() .zip(next_code_index) .map(|(shreds, next_code_index)| { - make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache) + make_erasure_batch( + keypair, + shreds, + next_code_index, + is_last_in_slot, + reed_solomon_cache, + ) }) .collect() } else { @@ -954,7 +963,13 @@ pub(super) fn make_shreds_from_data( .into_par_iter() .zip(next_code_index) .map(|(shreds, next_code_index)| { - make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache) + make_erasure_batch( + keypair, + shreds, + next_code_index, + is_last_in_slot, + reed_solomon_cache, + ) }) .collect() }) @@ -969,10 +984,11 @@ fn make_erasure_batch( keypair: &Keypair, shreds: Vec, next_code_index: u32, + is_last_in_slot: bool, reed_solomon_cache: &ReedSolomonCache, ) -> Result, Error> { let num_data_shreds = shreds.len(); - let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); + let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); let num_coding_shreds = erasure_batch_size - num_data_shreds; let proof_size = get_proof_size(erasure_batch_size); debug_assert!(shreds @@ -1056,7 +1072,10 @@ mod test { itertools::Itertools, rand::{seq::SliceRandom, CryptoRng, Rng}, rayon::ThreadPoolBuilder, - solana_sdk::signature::{Keypair, Signer}, + solana_sdk::{ + packet::PACKET_DATA_SIZE, + signature::{Keypair, Signer}, + }, std::{cmp::Ordering, iter::repeat_with}, test_case::test_case, }; @@ -1124,8 +1143,7 @@ mod test { assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]); } - fn run_merkle_tree_round_trip(size: usize) { - let mut rng = rand::thread_rng(); + fn run_merkle_tree_round_trip(rng: &mut R, size: usize) { let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from); let nodes: Vec<_> = nodes.take(size).collect(); let tree = make_merkle_tree(nodes.clone()); @@ -1145,8 +1163,9 @@ mod test { #[test] fn test_merkle_tree_round_trip() { - for size in [1, 2, 3, 4, 5, 6, 7, 8, 9, 19, 37, 64, 79] { - run_merkle_tree_round_trip(size); + let mut rng = rand::thread_rng(); + for size in 1..=143 { + run_merkle_tree_round_trip(&mut rng, size); } } @@ -1327,32 +1346,49 @@ mod test { } } - #[test_case(0)] - #[test_case(15600)] - #[test_case(31200)] - #[test_case(46800)] - fn test_make_shreds_from_data(data_size: usize) { + #[test_case(0, false)] + #[test_case(0, true)] + #[test_case(15600, false)] + #[test_case(15600, true)] + #[test_case(31200, false)] + #[test_case(31200, true)] + #[test_case(46800, false)] + #[test_case(46800, true)] + fn test_make_shreds_from_data(data_size: usize, is_last_in_slot: bool) { let mut rng = rand::thread_rng(); let data_size = data_size.saturating_sub(16); let reed_solomon_cache = ReedSolomonCache::default(); for data_size in data_size..data_size + 32 { - run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); + run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache); } } - #[test] - fn test_make_shreds_from_data_rand() { + #[test_case(false)] + #[test_case(true)] + fn test_make_shreds_from_data_rand(is_last_in_slot: bool) { let mut rng = rand::thread_rng(); let reed_solomon_cache = ReedSolomonCache::default(); for _ in 0..32 { let data_size = rng.gen_range(0..31200 * 7); - run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); + run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache); + } + } + + #[ignore] + #[test_case(false)] + #[test_case(true)] + fn test_make_shreds_from_data_paranoid(is_last_in_slot: bool) { + let mut rng = rand::thread_rng(); + let reed_solomon_cache = ReedSolomonCache::default(); + for data_size in 0..=PACKET_DATA_SIZE * 4 * 64 { + run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache); } } fn run_make_shreds_from_data( rng: &mut R, data_size: usize, + is_last_in_slot: bool, reed_solomon_cache: &ReedSolomonCache, ) { let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); @@ -1373,7 +1409,7 @@ mod test { parent_slot, shred_version, reference_tick, - true, // is_last_in_slot + is_last_in_slot, next_shred_index, next_code_index, reed_solomon_cache, @@ -1480,14 +1516,17 @@ mod test { .flags .contains(ShredFlags::LAST_SHRED_IN_SLOT)) .count(), - 1 + if is_last_in_slot { 1 } else { 0 } + ); + assert_eq!( + data_shreds + .last() + .unwrap() + .data_header + .flags + .contains(ShredFlags::LAST_SHRED_IN_SLOT), + is_last_in_slot ); - assert!(data_shreds - .last() - .unwrap() - .data_header - .flags - .contains(ShredFlags::LAST_SHRED_IN_SLOT)); // Assert that data shreds can be recovered from coding shreds. let recovered_data_shreds: Vec<_> = shreds .iter() diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index 1a597c41f984d4..f3203876de7066 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -207,7 +207,13 @@ impl Shredder { .iter() .scan(next_code_index, |next_code_index, chunk| { let num_data_shreds = chunk.len(); - let erasure_batch_size = get_erasure_batch_size(num_data_shreds); + let is_last_in_slot = chunk + .last() + .copied() + .map(Shred::last_in_slot) + .unwrap_or(true); + let erasure_batch_size = + get_erasure_batch_size(num_data_shreds, is_last_in_slot); *next_code_index += (erasure_batch_size - num_data_shreds) as u32; Some(*next_code_index) }), @@ -276,7 +282,12 @@ impl Shredder { && shred.version() == version && shred.fec_set_index() == fec_set_index)); let num_data = data.len(); - let num_coding = get_erasure_batch_size(num_data) + let is_last_in_slot = data + .last() + .map(Borrow::borrow) + .map(Shred::last_in_slot) + .unwrap_or(true); + let num_coding = get_erasure_batch_size(num_data, is_last_in_slot) .checked_sub(num_data) .unwrap(); assert!(num_coding > 0); @@ -434,11 +445,16 @@ impl Default for ReedSolomonCache { } /// Maps number of data shreds in each batch to the erasure batch size. -pub(crate) fn get_erasure_batch_size(num_data_shreds: usize) -> usize { - ERASURE_BATCH_SIZE +pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize { + let erasure_batch_size = ERASURE_BATCH_SIZE .get(num_data_shreds) .copied() - .unwrap_or(2 * num_data_shreds) + .unwrap_or(2 * num_data_shreds); + if is_last_in_slot { + erasure_batch_size.max(2 * DATA_SHREDS_PER_FEC_BLOCK) + } else { + erasure_batch_size + } } // Returns offsets to fec_set_index when spliting shreds into erasure batches. @@ -518,17 +534,19 @@ mod tests { }) .collect(); + let is_last_in_slot = true; let size = serialized_size(&entries).unwrap() as usize; // Integer division to ensure we have enough shreds to fit all the data let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; let num_expected_coding_shreds = - get_erasure_batch_size(num_expected_data_shreds) - num_expected_data_shreds; + get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot) + - num_expected_data_shreds; let start_index = 0; let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, &entries, - true, // is_last_in_slot + is_last_in_slot, start_index, // next_shred_index start_index, // next_code_index true, // merkle_variant @@ -792,7 +810,7 @@ mod tests { assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!( num_coding_shreds, - get_erasure_batch_size(num_data_shreds) - num_data_shreds + get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds ); let all_shreds = data_shreds @@ -1189,7 +1207,10 @@ mod tests { .iter() .group_by(|shred| shred.fec_set_index()) .into_iter() - .map(|(_, chunk)| get_erasure_batch_size(chunk.count())) + .map(|(_, chunk)| { + let chunk: Vec<_> = chunk.collect(); + get_erasure_batch_size(chunk.len(), chunk.last().unwrap().last_in_slot()) + }) .sum(); assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len()); } @@ -1232,9 +1253,10 @@ mod tests { #[test] fn test_max_shreds_per_slot() { for num_data_shreds in 32..128 { - let num_coding_shreds = get_erasure_batch_size(num_data_shreds) - .checked_sub(num_data_shreds) - .unwrap(); + let num_coding_shreds = + get_erasure_batch_size(num_data_shreds, /*is_last_in_slot:*/ false) + .checked_sub(num_data_shreds) + .unwrap(); assert!( MAX_DATA_SHREDS_PER_SLOT * num_coding_shreds <= MAX_CODE_SHREDS_PER_SLOT * num_data_shreds diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 658fdf0de3b04e..90ffae34f10ee5 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2558,6 +2558,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { #[test] #[serial] +#[ignore] fn test_rpc_block_subscribe() { let total_stake = 100 * DEFAULT_NODE_STAKE; let leader_stake = total_stake;