Skip to content

Commit

Permalink
Optimize logic for gossiping assignments
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh committed Jul 2, 2024
1 parent 18ed309 commit 713aef4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
30 changes: 21 additions & 9 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,21 @@ impl State {
let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});
// Peers that we will send the assignment to.
let mut peers = HashSet::new();

let peers_to_route_to = topology
.as_ref()
.map(|t| t.peers_to_route(required_routing))
.unwrap_or_default();

for peer in peers_to_route_to {
if !entry.known_by.contains_key(&peer) {
continue
}

peers.insert(peer);
}

// All the peers that know the relay chain block.
let peers_to_filter = entry.known_by();
Expand All @@ -1456,20 +1471,13 @@ impl State {
let n_peers_total = self.peer_views.len();
let source_peer = source.peer_id();

// Peers that we will send the assignment to.
let mut peers = Vec::new();

// Filter destination peers
for peer in peers_to_filter.into_iter() {
if Some(peer) == source_peer {
continue
}

if let Some(true) = topology
.as_ref()
.map(|t| t.local_grid_neighbors().route_to_peer(required_routing, &peer))
{
peers.push(peer);
if peers.contains(&peer) {
continue
}

Expand All @@ -1485,7 +1493,11 @@ impl State {

if route_random {
approval_entry.routing_info_mut().mark_randomly_sent(peer);
peers.push(peer);
peers.insert(peer);
}

if approval_entry.routing_info().random_routing.is_complete() {
break
}
}

Expand Down
12 changes: 8 additions & 4 deletions polkadot/node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ fn propagates_locally_generated_assignment_to_both_dimensions() {
let assignments = vec![(cert.clone(), candidate_index)];
let approvals = vec![approval.clone()];

let assignment_sent_peers = assert_matches!(
let mut assignment_sent_peers = assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
Expand All @@ -2428,12 +2428,14 @@ fn propagates_locally_generated_assignment_to_both_dimensions() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
mut sent_peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Random sampling is reused from the assignment.
sent_peers.sort();
assignment_sent_peers.sort();
assert_eq!(sent_peers, assignment_sent_peers);
assert_eq!(sent_approvals, approvals);
}
Expand Down Expand Up @@ -2678,7 +2680,7 @@ fn propagates_to_required_after_connect() {
let assignments = vec![(cert.clone(), candidate_index)];
let approvals = vec![approval.clone()];

let assignment_sent_peers = assert_matches!(
let mut assignment_sent_peers = assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
Expand All @@ -2702,12 +2704,14 @@ fn propagates_to_required_after_connect() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
mut sent_peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Random sampling is reused from the assignment.
sent_peers.sort();
assignment_sent_peers.sort();
assert_eq!(sent_peers, assignment_sent_peers);
assert_eq!(sent_approvals, approvals);
}
Expand Down
22 changes: 22 additions & 0 deletions polkadot/node/network/protocol/src/grid_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,23 @@ impl SessionGridTopologyEntry {
self.topology.is_validator(peer)
}

/// Returns the list of peers to route based on the required routing.
pub fn peers_to_route(&self, required_routing: RequiredRouting) -> Vec<PeerId> {
match required_routing {
RequiredRouting::All => self.topology.peer_ids.iter().copied().collect(),
RequiredRouting::GridX => self.local_neighbors.peers_x.iter().copied().collect(),
RequiredRouting::GridY => self.local_neighbors.peers_y.iter().copied().collect(),
RequiredRouting::GridXY => self
.local_neighbors
.peers_x
.iter()
.chain(self.local_neighbors.peers_y.iter())
.copied()
.collect(),
RequiredRouting::None | RequiredRouting::PendingTopology => Vec::new(),
}
}

/// Updates the known peer ids for the passed authorities ids.
pub fn update_authority_ids(
&mut self,
Expand Down Expand Up @@ -524,6 +541,11 @@ impl RandomRouting {
pub fn inc_sent(&mut self) {
self.sent += 1
}

/// Returns `true` if we already took all the necessary samples.
pub fn is_complete(&self) -> bool {
self.sent >= self.target
}
}

/// Routing mode
Expand Down

0 comments on commit 713aef4

Please sign in to comment.