Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

approval-distribution: Update topology if authorities are discovered later #2981

Merged
merged 11 commits into from
Jan 25, 2024
Merged
12 changes: 8 additions & 4 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,12 @@ impl State {
rng: &mut (impl CryptoRng + Rng),
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role, version, _) => {
NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
gum::info!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
if let Some(authority_ids) = authority_ids {
self.topologies.update_authority_ids(peer_id, &authority_ids);
}
// insert a blank view if none already present
gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected");
self.peer_views
.entry(peer_id)
.or_insert(PeerEntry { view: Default::default(), version });
Expand Down Expand Up @@ -716,8 +719,9 @@ impl State {
NetworkBridgeEvent::PeerMessage(peer_id, message) => {
self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await;
},
NetworkBridgeEvent::UpdatedAuthorityIds { .. } => {
// The approval-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s.
NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
gum::info!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
self.topologies.update_authority_ids(peer_id, &authority_ids);
},
}
}
Expand Down
5 changes: 3 additions & 2 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ where
session_index,
)
.await?;

self.update_authority_ids(sender, session_info.discovery_keys).await;
}
// authority_discovery is just a cache so let's try every leaf to detect if there
// are new authorities there.
self.update_authority_ids(sender, session_info.discovery_keys).await;
}
}
Ok(())
Expand Down
60 changes: 58 additions & 2 deletions polkadot/node/network/protocol/src/grid_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ impl SessionGridTopology {
SessionGridTopology { shuffled_indices, canonical_shuffling, peer_ids }
}

/// Updates the known peer ids for the passed authorithies ids.
pub fn update_authority_ids(
&mut self,
peer_id: PeerId,
ids: &HashSet<AuthorityDiscoveryId>,
) -> bool {
if self.peer_ids.contains(&peer_id) {
for peer in self
.canonical_shuffling
.iter_mut()
.filter(|peer| ids.contains(&peer.discovery_id))
{
peer.peer_ids.push(peer_id);
self.peer_ids.insert(peer_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is already in peer_ids, why do we insert it?

Copy link
Contributor Author

@alexggh alexggh Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why tests it is on the todo list, fixed.

I was looking more about what you think of the approach does it seem reasonable to you ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable at first sight, yes.

return true;
}
}
false
}
/// Produces the outgoing routing logic for a particular peer.
///
/// Returns `None` if the validator index is out of bounds.
Expand Down Expand Up @@ -269,6 +288,7 @@ impl GridNeighbors {
pub struct SessionGridTopologyEntry {
topology: SessionGridTopology,
local_neighbors: GridNeighbors,
local_index: Option<ValidatorIndex>,
}

impl SessionGridTopologyEntry {
Expand All @@ -291,6 +311,25 @@ impl SessionGridTopologyEntry {
pub fn is_validator(&self, peer: &PeerId) -> bool {
self.topology.is_validator(peer)
}

/// Updates the known peer ids for the passed authorithies ids.
pub fn update_authority_ids(
&mut self,
peer_id: PeerId,
ids: &HashSet<AuthorityDiscoveryId>,
) -> bool {
let peer_id_updated = self.topology.update_authority_ids(peer_id, ids);
// If we added a new peer id we need to recompute the grid neighbors, so that
// neighbors_x and neighbors_y reflect the right peer ids.
if peer_id_updated {
if let Some(local_index) = self.local_index.as_ref() {
let grid_neighbors =
self.topology.compute_grid_neighbors_for(*local_index).unwrap();
alexggh marked this conversation as resolved.
Show resolved Hide resolved
self.local_neighbors = grid_neighbors;
}
}
peer_id_updated
}
}

/// A set of topologies indexed by session
Expand All @@ -305,6 +344,20 @@ impl SessionGridTopologies {
self.inner.get(&session).and_then(|val| val.0.as_ref())
}

/// Updates the known peer ids for the passed authorithies ids.
pub fn update_authority_ids(
&mut self,
peer_id: PeerId,
ids: &HashSet<AuthorityDiscoveryId>,
) -> bool {
self.inner
.iter_mut()
.map(|(_, topology)| {
topology.0.as_mut().map(|topology| topology.update_authority_ids(peer_id, ids))
})
.any(|updated| updated.unwrap_or_default())
}

/// Increase references counter for a specific topology
pub fn inc_session_refs(&mut self, session: SessionIndex) {
self.inner.entry(session).or_insert((None, 0)).1 += 1;
Expand All @@ -330,10 +383,11 @@ impl SessionGridTopologies {
let entry = self.inner.entry(session).or_insert((None, 0));
if entry.0.is_none() {
let local_neighbors = local_index
.clone()
.and_then(|l| topology.compute_grid_neighbors_for(l))
.unwrap_or_else(GridNeighbors::empty);

entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors });
entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors, local_index });
}
}
}
Expand Down Expand Up @@ -368,6 +422,7 @@ impl Default for SessionBoundGridTopologyStorage {
peer_ids: Default::default(),
},
local_neighbors: GridNeighbors::empty(),
local_index: None,
},
},
prev_topology: None,
Expand Down Expand Up @@ -406,13 +461,14 @@ impl SessionBoundGridTopologyStorage {
local_index: Option<ValidatorIndex>,
) {
let local_neighbors = local_index
.clone()
.and_then(|l| topology.compute_grid_neighbors_for(l))
.unwrap_or_else(GridNeighbors::empty);

let old_current = std::mem::replace(
&mut self.current_topology,
GridTopologySessionBound {
entry: SessionGridTopologyEntry { topology, local_neighbors },
entry: SessionGridTopologyEntry { topology, local_neighbors, local_index },
session_index,
},
);
Expand Down
Loading