Skip to content

Commit

Permalink
reworks max number of outgoing push messages
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Sep 27, 2024
1 parent bce28c0 commit 1534cff
Showing 1 changed file with 5 additions and 13 deletions.
18 changes: 5 additions & 13 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use {
push_active_set::PushActiveSet,
received_cache::ReceivedCache,
},
bincode::serialized_size,
itertools::Itertools,
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
Expand Down Expand Up @@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3;

pub struct CrdsGossipPush {
/// Max bytes per message
max_bytes: usize,
/// Active set of validators for push
active_set: RwLock<PushActiveSet>,
/// Cursor into the crds table for values to push.
Expand All @@ -74,8 +70,6 @@ pub struct CrdsGossipPush {
impl Default for CrdsGossipPush {
fn default() -> Self {
Self {
// Allow upto 64 Crds Values per PUSH
max_bytes: PACKET_DATA_SIZE * 64,
active_set: RwLock::default(),
crds_cursor: Mutex::default(),
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
Expand Down Expand Up @@ -180,10 +174,10 @@ impl CrdsGossipPush {
usize, // number of values
usize, // number of push messages
) {
const MAX_NUM_PUSHES : usize = 10_000;
let active_set = self.active_set.read().unwrap();
let mut num_pushes = 0;
let mut num_values = 0;
let mut total_bytes: usize = 0;
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let wallclock_window = self.wallclock_window(now);
let mut crds_cursor = self.crds_cursor.lock().unwrap();
Expand All @@ -193,12 +187,7 @@ impl CrdsGossipPush {
.get_entries(crds_cursor.deref_mut())
.map(|entry| &entry.value)
.filter(|value| wallclock_window.contains(&value.wallclock()));
for value in entries {
let serialized_size = serialized_size(&value).unwrap();
total_bytes = total_bytes.saturating_add(serialized_size as usize);
if total_bytes > self.max_bytes {
break;
}
'outer: for value in entries {
num_values += 1;
let origin = value.pubkey();
let nodes = active_set.get_nodes(
Expand All @@ -210,6 +199,9 @@ impl CrdsGossipPush {
for node in nodes.take(self.push_fanout) {
push_messages.entry(*node).or_default().push(value.clone());
num_pushes += 1;
if num_pushes >= MAX_NUM_PUSHES {
break 'outer;
}
}
}
drop(crds);
Expand Down

0 comments on commit 1534cff

Please sign in to comment.