Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0: verifies retransmitter signature on chained Merkle shreds (backport of #1735) #2201

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 19 additions & 6 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,8 @@ pub mod layout {
.map(Hash::new)
}

pub(crate) fn set_retransmitter_signature(
shred: &mut [u8],
signature: &Signature,
) -> Result<(), Error> {
let offset = match get_shred_variant(shred)? {
fn get_retransmitter_signature_offset(shred: &[u8]) -> Result<usize, Error> {
match get_shred_variant(shred)? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant),
ShredVariant::MerkleCode {
proof_size,
Expand All @@ -777,7 +774,23 @@ pub mod layout {
} => {
merkle::ShredData::get_retransmitter_signature_offset(proof_size, chained, resigned)
}
}?;
}
}

pub fn get_retransmitter_signature(shred: &[u8]) -> Result<Signature, Error> {
let offset = get_retransmitter_signature_offset(shred)?;
shred
.get(offset..offset + SIZE_OF_SIGNATURE)
.map(|bytes| <[u8; SIZE_OF_SIGNATURE]>::try_from(bytes).unwrap())
.map(Signature::from)
.ok_or(Error::InvalidPayloadSize(shred.len()))
}

pub(crate) fn set_retransmitter_signature(
shred: &mut [u8],
signature: &Signature,
) -> Result<(), Error> {
let offset = get_retransmitter_signature_offset(shred)?;
let Some(buffer) = shred.get_mut(offset..offset + SIZE_OF_SIGNATURE) else {
return Err(Error::InvalidPayloadSize(shred.len()));
};
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

Expand Down
3 changes: 1 addition & 2 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ impl ClusterNodes<RetransmitStage> {

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or if it is not staked.
#[allow(unused)]
fn get_retransmit_parent(
pub(crate) fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
Expand Down
158 changes: 140 additions & 18 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use {
crate::{
cluster_nodes::{self, ClusterNodesCache},
retransmit_stage::RetransmitStage,
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
Expand All @@ -9,15 +13,22 @@ use {
},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_runtime::{
bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
bank_forks::BankForks,
},
solana_sdk::{
clock::Slot,
pubkey::Pubkey,
signature::{Keypair, Signer},
},
static_assertions::const_assert_eq,
std::{
collections::HashMap,
sync::{Arc, RwLock},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -30,6 +41,16 @@ const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

// Num epochs capacity should be at least 2 because near the epoch boundary we
// may receive shreds from the other side of the epoch boundary. Because of the
// TTL based eviction it does not make sense to cache more than
// MAX_LEADER_SCHEDULE_STAKES epochs.
const_assert_eq!(CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, 5);
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = MAX_LEADER_SCHEDULE_STAKES as usize;
// Because for ClusterNodes::get_retransmit_parent only pubkeys of staked nodes
// are needed, we can use longer durations for cache TTL.
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(30);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -48,6 +69,10 @@ pub fn spawn_shred_sigverify(
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
let cache = RwLock::new(LruCache::new(SIGVERIFY_LRU_CACHE_CAPACITY));
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
Expand All @@ -66,13 +91,15 @@ pub fn spawn_shred_sigverify(
match run_shred_sigverify(
&thread_pool,
&keypair,
&cluster_info,
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&cluster_nodes_cache,
&cache,
&mut stats,
) {
Expand All @@ -94,13 +121,15 @@ pub fn spawn_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
keypair: &Keypair,
cluster_info: &ClusterInfo,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
cache: &RwLock<LruCache>,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
Expand Down Expand Up @@ -128,34 +157,59 @@ fn run_shred_sigverify<const K: usize>(
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
let (working_bank, root_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank())
};
verify_packets(
thread_pool,
&keypair.pubkey(),
bank_forks,
&working_bank,
leader_schedule_cache,
recycler_cache,
&mut packets,
cache,
);
stats.num_discards_post += count_discards(&packets);
// Resign shreds Merkle root as the retransmitter node.
// Verify retransmitter's signature, and resign shreds
// Merkle root as the retransmitter node.
let resign_start = Instant::now();
thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| !packet.meta().discard())
.for_each(|packet| {
if let Some(shred) = shred::layout::get_shred_mut(packet) {
// We can ignore Error::InvalidShredVariant because that
// basically means that the shred is of a variant which
// cannot be signed by the retransmitter node.
if !matches!(
shred::layout::resign_shred(shred, keypair),
Ok(()) | Err(shred::Error::InvalidShredVariant)
) {
packet.meta_mut().set_discard(true);
}
let repair = packet.meta().repair();
let Some(shred) = shred::layout::get_shred_mut(packet) else {
packet.meta_mut().set_discard(true);
return;
};
// Repair packets do not follow turbine tree and
// are verified using the trailing nonce.
if !repair
&& !verify_retransmitter_signature(
shred,
&root_bank,
&working_bank,
cluster_info,
leader_schedule_cache,
cluster_nodes_cache,
stats,
)
{
stats
.num_invalid_retransmitter
.fetch_add(1, Ordering::Relaxed);
}
// We can ignore Error::InvalidShredVariant because that
// basically means that the shred is of a variant which
// cannot be signed by the retransmitter node.
if !matches!(
shred::layout::resign_shred(shred, keypair),
Ok(()) | Err(shred::Error::InvalidShredVariant)
) {
packet.meta_mut().set_discard(true);
}
})
});
Expand All @@ -175,18 +229,64 @@ fn run_shred_sigverify<const K: usize>(
Ok(())
}

#[must_use]
fn verify_retransmitter_signature(
shred: &[u8],
root_bank: &Bank,
working_bank: &Bank,
cluster_info: &ClusterInfo,
leader_schedule_cache: &LeaderScheduleCache,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
stats: &ShredSigVerifyStats,
) -> bool {
let signature = match shred::layout::get_retransmitter_signature(shred) {
Ok(signature) => signature,
// If the shred is not of resigned variant,
// then there is nothing to verify.
Err(shred::Error::InvalidShredVariant) => return true,
Err(_) => return false,
};
let Some(merkle_root) = shred::layout::get_merkle_root(shred) else {
return false;
};
let Some(shred) = shred::layout::get_shred_id(shred) else {
return false;
};
let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), Some(working_bank))
else {
stats
.num_unknown_slot_leader
.fetch_add(1, Ordering::Relaxed);
return false;
};
let cluster_nodes =
cluster_nodes_cache.get(shred.slot(), root_bank, working_bank, cluster_info);
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank);
let parent = match cluster_nodes.get_retransmit_parent(&leader, &shred, data_plane_fanout) {
Ok(Some(parent)) => parent,
Ok(None) => return true,
Err(err) => {
error!("get_retransmit_parent: {err:?}");
stats
.num_unknown_turbine_parent
.fetch_add(1, Ordering::Relaxed);
return false;
}
};
signature.verify(parent.as_ref(), merkle_root.as_ref())
}

fn verify_packets(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
working_bank: &Bank,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
packets: &mut [PacketBatch],
cache: &RwLock<LruCache>,
) {
let working_bank = bank_forks.read().unwrap().working_bank();
let leader_slots: HashMap<Slot, Pubkey> =
get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank)
get_slot_leaders(self_pubkey, packets, leader_schedule_cache, working_bank)
.into_iter()
.filter_map(|(slot, pubkey)| Some((slot, pubkey?)))
.chain(std::iter::once((Slot::MAX, Pubkey::default())))
Expand Down Expand Up @@ -262,7 +362,10 @@ struct ShredSigVerifyStats {
num_discards_post: usize,
num_discards_pre: usize,
num_duplicates: usize,
num_invalid_retransmitter: AtomicUsize,
num_retransmit_shreds: usize,
num_unknown_slot_leader: AtomicUsize,
num_unknown_turbine_parent: AtomicUsize,
elapsed_micros: u64,
resign_micros: u64,
}
Expand All @@ -280,7 +383,10 @@ impl ShredSigVerifyStats {
num_deduper_saturations: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_invalid_retransmitter: AtomicUsize::default(),
num_retransmit_shreds: 0usize,
num_unknown_slot_leader: AtomicUsize::default(),
num_unknown_turbine_parent: AtomicUsize::default(),
elapsed_micros: 0u64,
resign_micros: 0u64,
}
Expand All @@ -299,7 +405,22 @@ impl ShredSigVerifyStats {
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
(
"num_invalid_retransmitter",
self.num_invalid_retransmitter.load(Ordering::Relaxed),
i64
),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
(
"num_unknown_slot_leader",
self.num_unknown_slot_leader.load(Ordering::Relaxed),
i64
),
(
"num_unknown_turbine_parent",
self.num_unknown_turbine_parent.load(Ordering::Relaxed),
i64
),
("elapsed_micros", self.elapsed_micros, i64),
("resign_micros", self.resign_micros, i64),
);
Expand Down Expand Up @@ -365,10 +486,11 @@ mod tests {

let cache = RwLock::new(LruCache::new(/*capacity:*/ 128));
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
let working_bank = bank_forks.read().unwrap().working_bank();
verify_packets(
&thread_pool,
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&working_bank,
&leader_schedule_cache,
&RecyclerCache::warmed(),
&mut batches,
Expand Down