Skip to content

Commit

Permalink
removes ingesting (legacy) contact-info from incoming pull-requests (a…
Browse files Browse the repository at this point in the history
…nza-xyz#3317)

Gossip values should be propagated through push messages and pull
responses. We should not rely on pull requests to propagate gossip
data.
  • Loading branch information
behzadnouri authored Oct 29, 2024
1 parent 8abbd16 commit 0931864
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 177 deletions.
6 changes: 0 additions & 6 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2107,12 +2107,6 @@ impl ClusterInfo {
) -> PacketBatch {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests");
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
{
let _st = ScopedTimer::from(&self.stats.process_pull_requests);
self.gossip
.process_pull_requests(callers.cloned(), timestamp());
}
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packet_batch =
Expand Down
6 changes: 0 additions & 6 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ pub struct GossipStats {
pub(crate) packets_sent_push_messages_count: Counter,
pub(crate) process_gossip_packets_time: Counter,
pub(crate) process_prune: Counter,
pub(crate) process_pull_requests: Counter,
pub(crate) process_pull_response: Counter,
pub(crate) process_pull_response_count: Counter,
pub(crate) process_pull_response_fail_insert: Counter,
Expand Down Expand Up @@ -328,11 +327,6 @@ pub(crate) fn submit_gossip_stats(
stats.process_pull_response_len.clear(),
i64
),
(
"process_pull_requests",
stats.process_pull_requests.clear(),
i64
),
(
"pull_request_ping_pong_check_failed_count",
stats.pull_request_ping_pong_check_failed_count.clear(),
Expand Down
8 changes: 0 additions & 8 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,6 @@ impl CrdsGossip {
)
}

/// Process a pull request and create a response.
pub fn process_pull_requests<I>(&self, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
CrdsGossipPull::process_pull_requests(&self.crds, callers, now);
}

pub fn generate_pull_responses(
&self,
thread_pool: &ThreadPool,
Expand Down
78 changes: 0 additions & 78 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,6 @@ impl CrdsGossipPull {
Ok(out.into_values().collect())
}

/// Process a pull request
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
let mut crds = crds.write().unwrap();
for caller in callers {
let key = caller.pubkey();
let _ = crds.insert(caller, now, GossipRoute::PullRequest);
crds.update_record_timestamp(&key, now);
}
}

/// Create gossip responses to pull requests
pub(crate) fn generate_pull_responses(
thread_pool: &ThreadPool,
Expand Down Expand Up @@ -1061,66 +1048,6 @@ pub(crate) mod tests {
assert_eq!(rsp.iter().find(|r| r.len() == 1).unwrap().len(), 1);
}

#[test]
fn test_process_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&node_keypair.pubkey(),
0,
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
);
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
&node_crds,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
&SocketAddrSpace::Unspecified,
);

let dest_crds = RwLock::<Crds>::default();
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
usize::MAX, // output_size_limit
0, // now
&GossipStats::default(),
);
let callers = filters.into_iter().map(|(caller, _)| caller);
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
let dest_crds = dest_crds.read().unwrap();
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.get::<&CrdsValue>(&caller.label()).is_some());
assert_eq!(1, {
let entry: &VersionedCrdsValue = dest_crds.get(&caller.label()).unwrap();
entry.local_timestamp
});
}
#[test]
fn test_process_pull_request_response() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
Expand Down Expand Up @@ -1202,11 +1129,6 @@ pub(crate) mod tests {
0, // now
&GossipStats::default(),
);
CrdsGossipPull::process_pull_requests(
&dest_crds,
filters.into_iter().map(|(caller, _)| caller),
0,
);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
if rsp.is_empty() {
Expand Down
70 changes: 1 addition & 69 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
std::{
borrow::{Borrow, Cow},
cmp::Ordering,
collections::{hash_map::Entry, BTreeSet, HashMap},
collections::BTreeSet,
fmt,
},
};
Expand Down Expand Up @@ -695,30 +695,6 @@ impl CrdsValue {
}
}

/// Filters out an iterator of crds values, returning
/// the unique ones with the most recent wallclock.
pub(crate) fn filter_current<'a, I>(values: I) -> impl Iterator<Item = &'a CrdsValue>
where
I: IntoIterator<Item = &'a CrdsValue>,
{
let mut out = HashMap::new();
for value in values {
match out.entry(value.label()) {
Entry::Vacant(entry) => {
entry.insert((value, value.wallclock()));
}
Entry::Occupied(mut entry) => {
let value_wallclock = value.wallclock();
let (_, entry_wallclock) = entry.get();
if *entry_wallclock < value_wallclock {
entry.insert((value, value_wallclock));
}
}
}
}
out.into_iter().map(|(_, (v, _))| v)
}

pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> {
if wallclock >= MAX_WALLCLOCK {
Err(SanitizeError::ValueOutOfBounds)
Expand All @@ -732,15 +708,12 @@ mod test {
use {
super::*,
bincode::{deserialize, Options},
rand::SeedableRng,
rand_chacha::ChaChaRng,
solana_perf::test_tx::new_test_vote_tx,
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
solana_vote_program::{vote_instruction, vote_state},
std::{cmp::Ordering, iter::repeat_with},
};

#[test]
Expand Down Expand Up @@ -903,47 +876,6 @@ mod test {
serialize_deserialize_value(value, correct_keypair);
}

#[test]
fn test_filter_current() {
let seed = [48u8; 32];
let mut rng = ChaChaRng::from_seed(seed);
let keys: Vec<_> = repeat_with(Keypair::new).take(16).collect();
let values: Vec<_> = repeat_with(|| {
let index = rng.gen_range(0..keys.len());
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
})
.take(1 << 12)
.collect();
let mut currents = HashMap::new();
for value in filter_current(&values) {
// Assert that filtered values have unique labels.
assert!(currents.insert(value.label(), value).is_none());
}
// Assert that currents are the most recent version of each value.
let mut count = 0;
for value in &values {
let current_value = currents.get(&value.label()).unwrap();
match value.wallclock().cmp(&current_value.wallclock()) {
Ordering::Less => (),
Ordering::Equal => {
// There is a chance that two randomly generated
// crds-values have the same label and wallclock.
if value == *current_value {
count += 1;
}
}
Ordering::Greater => panic!("this should not happen!"),
}
}
assert_eq!(count, currents.len());
// Currently CrdsData::new_rand is implemented for:
// AccountsHashes, ContactInfo, LowestSlot, LegacySnapshotHashes, Version
// EpochSlots x MAX_EPOCH_SLOTS
// Vote x MAX_VOTES
let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize;
assert!(currents.len() <= keys.len() * num_kinds);
}

#[test]
fn test_node_instance_crds_lable() {
fn make_crds_value(node: NodeInstance) -> CrdsValue {
Expand Down
40 changes: 30 additions & 10 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet},
net::Ipv4Addr,
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
Expand Down Expand Up @@ -96,13 +97,20 @@ fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
}

fn star_network_create(num: usize) -> Network {
let gossip_port_offset = 9000;
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
.map(|k| {
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
// Need unique gossip addresses, otherwise nodes will be deduped by
// crds_gossip::dedup_gossip_addresses before peer sampling.
let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let gossip_port = gossip_port_offset + u16::try_from(k).unwrap();
contact_info
.set_gossip((Ipv4Addr::LOCALHOST, gossip_port))
.unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let node = CrdsGossip::default();
{
Expand Down Expand Up @@ -246,6 +254,24 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {

fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) {
let num = network.len();
// In absence of gossip push messages, a pull only network with star
// topology will not converge because it forms a DAG. We add additional
// edges so that there is a directed path between every two pair of nodes.
let (pubkeys, mut entries): (Vec<_>, Vec<_>) = network
.nodes
.iter()
.map(|(&pubkey, node)| {
let label = CrdsValueLabel::ContactInfo(pubkey);
let crds = node.gossip.crds.read().unwrap();
let entry = crds.get::<&CrdsValue>(&label).unwrap().clone();
(pubkey, entry)
})
.unzip();
entries.rotate_right(1);
for (pubkey, entry) in pubkeys.into_iter().zip(entries) {
let mut crds = network.nodes[&pubkey].gossip.crds.write().unwrap();
let _ = crds.insert(entry, timestamp(), GossipRoute::LocalMessage);
}
let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9);
trace!(
"network_simulator_pull_{}: converged: {} total_bytes: {}",
Expand Down Expand Up @@ -540,8 +566,7 @@ fn network_run_pull(
let rsp: Vec<_> = network
.get(&to)
.map(|node| {
let rsp = node
.gossip
node.gossip
.generate_pull_responses(
thread_pool,
&filters,
Expand All @@ -551,12 +576,7 @@ fn network_run_pull(
)
.into_iter()
.flatten()
.collect();
node.gossip.process_pull_requests(
filters.into_iter().map(|(caller, _)| caller),
now,
);
rsp
.collect()
})
.unwrap();
bytes += serialized_size(&rsp).unwrap() as usize;
Expand Down

0 comments on commit 0931864

Please sign in to comment.