Skip to content

Commit

Permalink
pings received contact-infos on gossip socket address
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Jun 5, 2024
1 parent f807911 commit d6bf04a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 6 deletions.
84 changes: 79 additions & 5 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25;
const MAX_PRUNE_DATA_NODES: usize = 32;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 65536;
const GOSSIP_PING_CACHE_CAPACITY: usize = 262144;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
Expand Down Expand Up @@ -2470,6 +2470,20 @@ impl ClusterInfo {
}
Ok(())
};
let mut rng = rand::thread_rng();
let keypair: Arc<Keypair> = self.keypair().clone();
let mut pings = Vec::new();
let mut verify_gossip_addr = |value: &CrdsValue| {
verify_gossip_addr(
&mut rng,
&keypair,
value,
stakes,
&self.socket_addr_space,
&self.ping_cache,
&mut pings,
)
};
// Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
Expand All @@ -2480,15 +2494,23 @@ impl ClusterInfo {
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
if verify_gossip_addr(&caller) {
pull_requests.push((from_addr, filter, caller))
}
}
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
pull_responses.append(&mut data);
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
pull_responses.append(&mut data);
}
}
Protocol::PushMessage(from, data) => {
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
push_messages.push((from, data));
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
push_messages.push((from, data));
}
}
Protocol::PruneMessage(_from, data) => prune_messages.push(data),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Expand All @@ -2502,6 +2524,21 @@ impl ClusterInfo {
}
push_messages.retain(|(_, data)| !data.is_empty());
}
if !pings.is_empty() {
let pings: Vec<_> = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)))
.collect();
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"ping_contact_infos",
&pings,
);
let _ = response_sender.send(packet_batch);
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages, stakes);
self.handle_batch_push_messages(
Expand Down Expand Up @@ -3206,6 +3243,43 @@ fn filter_on_shred_version(
}
}

// If the CRDS value is a unstaked contact-info verifies if
// it has responded to ping on its gossip socket address.
// Returns false if the CRDS value should be discarded.
#[must_use]
fn verify_gossip_addr<R: Rng + CryptoRng>(
rng: &mut R,
keypair: &Keypair,
value: &CrdsValue,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
) -> bool {
let (pubkey, addr) = match &value.data {
CrdsData::ContactInfo(node) => (node.pubkey(), node.gossip()),
CrdsData::LegacyContactInfo(node) => (node.pubkey(), node.gossip()),
_ => return true, // If not a contact-info, nothing to verify.
};
// For (sufficiently) staked nodes, don't bother with ping/pong.
if stakes.get(pubkey) >= Some(&MIN_STAKE_FOR_GOSSIP) {
return true;
}
let Some(addr) = addr.ok().filter(|addr| socket_addr_space.check(addr)) else {
return false;
};
let (out, ping) = {
let node = (*pubkey, addr);
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let mut ping_cache = ping_cache.lock().unwrap();
ping_cache.check(Instant::now(), node, &mut pingf)
};
if let Some(ping) = ping {
pings.push((addr, ping));
}
out
}

#[cfg(test)]
mod tests {
use {
Expand Down
7 changes: 6 additions & 1 deletion gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ impl LegacyContactInfo {
self.shred_version
}

get_socket!(gossip);
pub(crate) fn gossip(&self) -> Result<SocketAddr, Error> {
let socket = &self.gossip;
crate::contact_info::sanitize_socket(socket)?;
Ok(socket).copied()
}

get_socket!(tvu, tvu_quic);
get_socket!(@quic tpu);
get_socket!(@quic tpu_forwards);
Expand Down

0 comments on commit d6bf04a

Please sign in to comment.