Skip to content

Commit

Permalink
Shred Repair Request (solana-labs#34771)
Browse files Browse the repository at this point in the history
* shred repair admin rpc
  • Loading branch information
bw-solana authored Jan 17, 2024
1 parent 586c794 commit 8fe4f5d
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 8 deletions.
5 changes: 5 additions & 0 deletions core/src/admin_rpc_post_init.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use {
crate::repair::{outstanding_requests, serve_repair},
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
std::{
collections::HashSet,
net::UdpSocket,
sync::{Arc, RwLock},
},
};
Expand All @@ -15,4 +17,7 @@ pub struct AdminRpcRequestMetadataPostInit {
pub vote_account: Pubkey,
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
pub repair_socket: Arc<UdpSocket>,
pub outstanding_repair_requests:
Arc<RwLock<outstanding_requests::OutstandingRequests<serve_repair::ShredRepairType>>>,
}
124 changes: 123 additions & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ use {
outstanding_requests::OutstandingRequests,
quic_endpoint::LocalRequest,
repair_weight::RepairWeight,
serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
serve_repair::{
self, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType,
REPAIR_PEERS_CACHE_CAPACITY,
},
},
},
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
lru::LruCache,
solana_client::connection_cache::Protocol,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, SlotMeta},
Expand Down Expand Up @@ -678,6 +682,70 @@ impl RepairService {
}
}

pub fn request_repair_for_shred_from_peer(
cluster_info: Arc<ClusterInfo>,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
repair_socket: &UdpSocket,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) {
let peer_repair_addr = cluster_info
.lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP))
.unwrap()
.unwrap();
Self::request_repair_for_shred_from_address(
cluster_info,
pubkey,
peer_repair_addr,
slot,
shred_index,
repair_socket,
outstanding_repair_requests,
);
}

fn request_repair_for_shred_from_address(
cluster_info: Arc<ClusterInfo>,
pubkey: Pubkey,
address: SocketAddr,
slot: u64,
shred_index: u64,
repair_socket: &UdpSocket,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) {
// Setup repair request
let identity_keypair = cluster_info.keypair();
let repair_request = ShredRepairType::Shred(slot, shred_index);
let nonce = outstanding_repair_requests
.write()
.unwrap()
.add_request(repair_request, timestamp());

// Create repair request
let header = RepairRequestHeader::new(cluster_info.id(), pubkey, timestamp(), nonce);
let request_proto = RepairProtocol::WindowIndex {
header,
slot,
shred_index,
};
let packet_buf =
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();

// Prepare packet batch to send
let reqs = vec![(packet_buf, address)];

// Send packet batch
match batch_send(repair_socket, &reqs[..]) {
Ok(()) => {
trace!("successfully sent repair request!");
}
Err(SendPktsError::IoError(err, _num_failed)) => {
error!("batch_send failed to send packet - error = {:?}", err);
}
}
}

/// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
#[cfg(test)]
pub fn generate_repairs_in_range(
Expand Down Expand Up @@ -859,6 +927,7 @@ pub(crate) fn sleep_shred_deferment_period() {
mod test {
use {
super::*,
crate::repair::quic_endpoint::RemoteRequest,
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::{
blockstore::{
Expand All @@ -883,6 +952,59 @@ mod test {
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
}

#[test]
pub fn test_request_repair_for_shred_from_address() {
// Setup cluster and repair info
let cluster_info = Arc::new(new_test_cluster_info());
let pubkey = cluster_info.id();
let slot = 100;
let shred_index = 50;
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let address = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));

// Send a repair request
RepairService::request_repair_for_shred_from_address(
cluster_info.clone(),
pubkey,
address,
slot,
shred_index,
&sender,
outstanding_repair_requests,
);

// Receive and translate repair packet
let mut packets = vec![solana_sdk::packet::Packet::default(); 1];
let _recv_count = solana_streamer::recvmmsg::recv_mmsg(&reader, &mut packets[..]).unwrap();
let packet = &packets[0];
let Some(bytes) = packet.data(..).map(Vec::from) else {
panic!("packet data not found");
};
let remote_request = RemoteRequest {
remote_pubkey: None,
remote_address: packet.meta().socket_addr(),
bytes,
response_sender: None,
};

// Deserialize and check the request
let deserialized =
serve_repair::deserialize_request::<RepairProtocol>(&remote_request).unwrap();
match deserialized {
RepairProtocol::WindowIndex {
slot: deserialized_slot,
shred_index: deserialized_shred_index,
..
} => {
assert_eq!(deserialized_slot, slot);
assert_eq!(deserialized_shred_index, shred_index);
}
_ => panic!("unexpected repair protocol"),
}
}

#[test]
pub fn test_repair_orphan() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
Expand Down
4 changes: 3 additions & 1 deletion core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,9 @@ pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
Protocol::UDP
}

fn deserialize_request<T>(request: &RemoteRequest) -> std::result::Result<T, bincode::Error>
pub(crate) fn deserialize_request<T>(
request: &RemoteRequest,
) -> std::result::Result<T, bincode::Error>
where
T: serde::de::DeserializeOwned,
{
Expand Down
9 changes: 8 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
consensus::{tower_storage::TowerStorage, Tower},
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo},
repair::{
quic_endpoint::LocalRequest,
repair_service::{OutstandingShredRepairs, RepairInfo},
},
replay_stage::{ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
Expand Down Expand Up @@ -138,6 +141,7 @@ impl Tvu {
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -228,6 +232,7 @@ impl Tvu {
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
outstanding_repair_requests,
)
};

Expand Down Expand Up @@ -442,6 +447,7 @@ pub mod tests {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -496,6 +502,7 @@ pub mod tests {
turbine_quic_endpoint_sender,
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
outstanding_repair_requests,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);
Expand Down
8 changes: 7 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,13 +1255,16 @@ impl Validator {
};
let last_vote = tower.last_vote();

let outstanding_repair_requests =
Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();

let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
&bank_forks,
&cluster_info,
TvuSockets {
repair: node.sockets.repair,
repair: node.sockets.repair.try_clone().unwrap(),
retransmit: node.sockets.retransmit_sockets,
fetch: node.sockets.tvu,
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
Expand Down Expand Up @@ -1307,6 +1310,7 @@ impl Validator {
turbine_quic_endpoint_sender.clone(),
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
outstanding_repair_requests.clone(),
)?;

if in_wen_restart {
Expand Down Expand Up @@ -1383,6 +1387,8 @@ impl Validator {
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
notifies: key_notifies,
repair_socket: Arc::new(node.sockets.repair),
outstanding_repair_requests,
});

Ok(Self {
Expand Down
7 changes: 3 additions & 4 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ impl WindowService {
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();

let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();

Expand All @@ -401,7 +400,7 @@ impl WindowService {
repair_quic_endpoint_response_sender,
repair_info,
verified_vote_receiver,
outstanding_requests.clone(),
outstanding_repair_requests.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand All @@ -426,7 +425,7 @@ impl WindowService {
duplicate_sender,
completed_data_sets_sender,
retransmit_sender,
outstanding_requests,
outstanding_repair_requests,
);

WindowService {
Expand Down
36 changes: 36 additions & 0 deletions validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
solana_core::{
admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
consensus::{tower_storage::TowerStorage, Tower},
repair::repair_service,
validator::ValidatorStartProgress,
},
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
Expand Down Expand Up @@ -207,6 +208,15 @@ pub trait AdminRpc {
#[rpc(meta, name = "contactInfo")]
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;

#[rpc(meta, name = "repairShredFromPeer")]
fn repair_shred_from_peer(
&self,
meta: Self::Metadata,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
) -> Result<()>;

#[rpc(meta, name = "repairWhitelist")]
fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;

Expand Down Expand Up @@ -487,6 +497,28 @@ impl AdminRpc for AdminRpcImpl {
meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
}

fn repair_shred_from_peer(
&self,
meta: Self::Metadata,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
) -> Result<()> {
debug!("repair_shred_from_peer request received");

meta.with_post_init(|post_init| {
repair_service::RepairService::request_repair_for_shred_from_peer(
post_init.cluster_info.clone(),
pubkey,
slot,
shred_index,
&post_init.repair_socket.clone(),
post_init.outstanding_repair_requests.clone(),
);
Ok(())
})
}

fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
debug!("repair_whitelist request received");

Expand Down Expand Up @@ -895,6 +927,10 @@ mod tests {
vote_account,
repair_whitelist,
notifies: Vec::new(),
repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()),
outstanding_repair_requests: Arc::<
RwLock<repair_service::OutstandingShredRepairs>,
>::default(),
}))),
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
rpc_to_plugin_manager_sender: None,
Expand Down
27 changes: 27 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,33 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.help("Output display mode")
)
)
.subcommand(SubCommand::with_name("repair-shred-from-peer")
.about("Request a repair from the specified validator")
.arg(
Arg::with_name("pubkey")
.long("pubkey")
.value_name("PUBKEY")
.takes_value(true)
.validator(is_pubkey)
.help("Identity pubkey of the validator to repair from")
)
.arg(
Arg::with_name("slot")
.long("slot")
.value_name("SLOT")
.takes_value(true)
.validator(is_parsable::<u64>)
.help("Slot to repair")
)
.arg(
Arg::with_name("shred")
.long("shred")
.value_name("SHRED")
.takes_value(true)
.validator(is_parsable::<u64>)
.help("Shred to repair")
)
)
.subcommand(
SubCommand::with_name("repair-whitelist")
.about("Manage the validator's repair protocol whitelist")
Expand Down
Loading

0 comments on commit 8fe4f5d

Please sign in to comment.