From 781daf03c0dd3d4a327b1c244fb8ae3b29f69fab Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Thu, 18 Jan 2024 10:43:28 +0200 Subject: [PATCH 01/11] Update authorithy ids every block Signed-off-by: Alexandru Gheorghe --- .../network/approval-distribution/src/lib.rs | 12 ++-- .../node/network/gossip-support/src/lib.rs | 5 +- .../network/protocol/src/grid_topology.rs | 60 ++++++++++++++++++- 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index d520febaef51..0853e52f4aad 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -667,9 +667,12 @@ impl State { rng: &mut (impl CryptoRng + Rng), ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role, version, _) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => { + gum::info!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); + if let Some(authority_ids) = authority_ids { + self.topologies.update_authority_ids(peer_id, &authority_ids); + } // insert a blank view if none already present - gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected"); self.peer_views .entry(peer_id) .or_insert(PeerEntry { view: Default::default(), version }); @@ -716,8 +719,9 @@ impl State { NetworkBridgeEvent::PeerMessage(peer_id, message) => { self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; }, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The approval-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + gum::info!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); + self.topologies.update_authority_ids(peer_id, &authority_ids); }, } } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 22417795d5ea..86245479baf7 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -270,9 +270,10 @@ where session_index, ) .await?; - - self.update_authority_ids(sender, session_info.discovery_keys).await; } + // authority_discovery is just a cache so let's try every leaf to detect if there + // are new authorities there. + self.update_authority_ids(sender, session_info.discovery_keys).await; } } Ok(()) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index 8bd9adbc17c1..ed8d400e081f 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -89,6 +89,25 @@ impl SessionGridTopology { SessionGridTopology { shuffled_indices, canonical_shuffling, peer_ids } } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + if self.peer_ids.contains(&peer_id) { + for peer in self + .canonical_shuffling + .iter_mut() + .filter(|peer| ids.contains(&peer.discovery_id)) + { + peer.peer_ids.push(peer_id); + self.peer_ids.insert(peer_id); + return true; + } + } + false + } /// Produces the outgoing routing logic for a particular peer. /// /// Returns `None` if the validator index is out of bounds. @@ -269,6 +288,7 @@ impl GridNeighbors { pub struct SessionGridTopologyEntry { topology: SessionGridTopology, local_neighbors: GridNeighbors, + local_index: Option, } impl SessionGridTopologyEntry { @@ -291,6 +311,25 @@ impl SessionGridTopologyEntry { pub fn is_validator(&self, peer: &PeerId) -> bool { self.topology.is_validator(peer) } + + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + let peer_id_updated = self.topology.update_authority_ids(peer_id, ids); + // If we added a new peer id we need to recompute the grid neighbors, so that + // neighbors_x and neighbors_y reflect the right peer ids. + if peer_id_updated { + if let Some(local_index) = self.local_index.as_ref() { + let grid_neighbors = + self.topology.compute_grid_neighbors_for(*local_index).unwrap(); + self.local_neighbors = grid_neighbors; + } + } + peer_id_updated + } } /// A set of topologies indexed by session @@ -305,6 +344,20 @@ impl SessionGridTopologies { self.inner.get(&session).and_then(|val| val.0.as_ref()) } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + self.inner + .iter_mut() + .map(|(_, topology)| { + topology.0.as_mut().map(|topology| topology.update_authority_ids(peer_id, ids)) + }) + .any(|updated| updated.unwrap_or_default()) + } + /// Increase references counter for a specific topology pub fn inc_session_refs(&mut self, session: SessionIndex) { self.inner.entry(session).or_insert((None, 0)).1 += 1; @@ -330,10 +383,11 @@ impl SessionGridTopologies { let entry = self.inner.entry(session).or_insert((None, 0)); if entry.0.is_none() { let local_neighbors = local_index + .clone() .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); - entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors }); + entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors, local_index }); } } } @@ -368,6 +422,7 @@ impl Default for SessionBoundGridTopologyStorage { peer_ids: Default::default(), }, local_neighbors: GridNeighbors::empty(), + local_index: None, }, }, prev_topology: None, @@ -406,13 +461,14 @@ impl SessionBoundGridTopologyStorage { local_index: Option, ) { let local_neighbors = local_index + .clone() .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); let old_current = std::mem::replace( &mut self.current_topology, GridTopologySessionBound { - entry: SessionGridTopologyEntry { topology, local_neighbors }, + entry: SessionGridTopologyEntry { topology, local_neighbors, local_index }, session_index, }, ); From f4c74ba26347a0590589c27110744840af93da45 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Thu, 18 Jan 2024 12:38:43 +0200 Subject: [PATCH 02/11] Fixup reverse condition Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/protocol/src/grid_topology.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index ed8d400e081f..ca489dd21553 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -95,7 +95,7 @@ impl SessionGridTopology { peer_id: PeerId, ids: &HashSet, ) -> bool { - if self.peer_ids.contains(&peer_id) { + if !self.peer_ids.contains(&peer_id) { for peer in self .canonical_shuffling .iter_mut() From 985fe6248164a028f7f08c9c586c4a87e8d613dc Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 22 Jan 2024 11:53:17 +0200 Subject: [PATCH 03/11] Add unittest Signed-off-by: Alexandru Gheorghe --- .../network/approval-distribution/src/lib.rs | 46 ++- .../approval-distribution/src/tests.rs | 374 ++++++++++++++++-- 2 files changed, 388 insertions(+), 32 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 0853e52f4aad..59b80c8d07cc 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -721,7 +721,38 @@ impl State { }, NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { gum::info!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); - self.topologies.update_authority_ids(peer_id, &authority_ids); + if self.topologies.update_authority_ids(peer_id, &authority_ids) { + if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) { + let intersection = self + .blocks_by_number + .iter() + .filter(|(block_number, _)| *block_number > &view.finalized_number) + .map(|(_, hashes)| { + hashes.iter().filter(|hash| { + self.blocks + .get(&hash) + .map(|block| block.known_by.get(&peer_id).is_some()) + .unwrap_or_default() + }) + }) + .flatten(); + let view_intersection = + View::new(intersection.cloned(), view.finalized_number); + Self::unify_with_peer( + ctx.sender(), + metrics, + &mut self.blocks, + &self.topologies, + self.peer_views.len(), + peer_id, + *version, + view_intersection, + rng, + true, + ) + .await; + } + } }, } } @@ -793,6 +824,7 @@ impl State { *version, view_intersection, rng, + false, ) .await; } @@ -1105,6 +1137,7 @@ impl State { protocol_version, view, rng, + false, ) .await; } @@ -1842,6 +1875,7 @@ impl State { protocol_version: ProtocolVersion, view: View, rng: &mut (impl CryptoRng + Rng), + retry_known_blocks: bool, ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); @@ -1860,10 +1894,12 @@ impl State { _ => break, }; - // Any peer which is in the `known_by` set has already been - // sent all messages it's meant to get for that block and all - // in-scope prior blocks. - if entry.known_by.contains_key(&peer_id) { + // Any peer which is in the `known_by` see and we know its peer_id authorithy id + // mapping has already been sent all messages it's meant to get for that block and + // all in-scope prior blocks. In case, we just learnt about its peer_id + // authorithy-id mapping we have to retry sending the messages that should be sent + // to. + if entry.known_by.contains_key(&peer_id) && !retry_known_blocks { break } diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 7d933e2047f2..5c63d7138e5f 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -130,7 +130,7 @@ fn make_peers_and_authority_ids(n: usize) -> Vec<(PeerId, AuthorityDiscoveryId)> fn make_gossip_topology( session: SessionIndex, - all_peers: &[(PeerId, AuthorityDiscoveryId)], + all_peers: &[(Option, AuthorityDiscoveryId)], neighbors_x: &[usize], neighbors_y: &[usize], local_index: usize, @@ -153,7 +153,7 @@ fn make_gossip_topology( assert!(all_peers.len() >= grid_size); let peer_info = |i: usize| TopologyPeerInfo { - peer_ids: vec![all_peers[i].0], + peer_ids: all_peers[i].0.clone().into_iter().collect_vec(), validator_index: ValidatorIndex::from(i as u32), discovery_id: all_peers[i].1.clone(), }; @@ -396,7 +396,15 @@ fn try_import_the_same_assignment() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -485,7 +493,15 @@ fn try_import_the_same_assignment_v2() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -724,8 +740,16 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { let peer = &peer_a; setup_peer_with_view(overseer, peer, view![], ValidationVersion::V1).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; // new block `hash` with 1 candidates let meta = BlockApprovalMeta { @@ -822,8 +846,16 @@ fn import_approval_happy_path_v1_v2_peers() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -936,8 +968,16 @@ fn import_approval_happy_path_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -1039,8 +1079,16 @@ fn multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1221,8 +1269,16 @@ fn unify_with_peer_multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1571,8 +1627,16 @@ fn update_peer_view() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(0)); let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(0)); @@ -1694,6 +1758,174 @@ fn update_peer_view() { assert!(state.blocks.get(&hash_c).unwrap().known_by.get(peer).is_none()); } +// Tests that updating the known peer_id for a given authorithy updates the topology +// and sends the required messages +#[test] +fn update_peer_authority_id() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash_a = Hash::repeat_byte(0xAA); + let hash_b = Hash::repeat_byte(0xBB); + let hash_c = Hash::repeat_byte(0xCC); + let peers = make_peers_and_authority_ids(8); + // X neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_x = peers.first().unwrap().0; + // Y neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_y = peers.get(2).unwrap().0; + + let _state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // new block `hash_a` with 1 candidates + let meta_a = BlockApprovalMeta { + hash: hash_a, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_b = BlockApprovalMeta { + hash: hash_b, + parent_hash: hash_a, + number: 2, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_c = BlockApprovalMeta { + hash: hash_c, + parent_hash: hash_b, + number: 3, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); + overseer_send(overseer, msg).await; + + let peers_with_optional_peer_id = peers + .iter() + .enumerate() + .map(|(index, (peer_id, authority))| { + (if index == 0 { None } else { Some(*peer_id) }, authority.clone()) + }) + .collect_vec(); + + // Setup a topology where peer_a is neigboor to current node. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; + + let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(0)); + let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(0)); + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_a.into(), 0.into()), + ) + .await; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_b.into(), 0.into()), + ) + .await; + + // connect a peer + setup_peer_with_view(overseer, &neighbour_x, view![hash_a], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_a], ValidationVersion::V1).await; + + setup_peer_with_view(overseer, &neighbour_x, view![hash_b], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_b], ValidationVersion::V1).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[0].0, + [peers[0].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + + // we should send relevant assignments to the peer, after we found it's peer id. + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + gum::info!(target: LOG_TARGET, ?peers, ?assignments); + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 2); + assert_eq!(assignments.get(0).unwrap().0.block_hash, hash_a); + assert_eq!(assignments.get(1).unwrap().0.block_hash, hash_b); + assert_eq!(peers.get(0), Some(&neighbour_x)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[2].0, + [peers[2].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[0].0, + [peers[0].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + assert!( + overseer.recv().timeout(TIMEOUT).await.is_none(), + "no message should be sent peers are already known" + ); + + virtual_overseer + }); +} + /// E.g. if someone copies the keys... #[test] fn import_remotely_then_locally() { @@ -1808,8 +2040,16 @@ fn sends_assignments_even_when_state_is_approved() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let candidate_index = 0u32; @@ -1900,8 +2140,16 @@ fn sends_assignments_even_when_state_is_approved_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let cores = vec![0, 1, 2, 3]; @@ -2080,12 +2328,17 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2197,10 +2450,21 @@ fn propagates_assignments_along_unshared_dimension() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2339,13 +2603,16 @@ fn propagates_to_required_after_connect() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2533,11 +2800,20 @@ fn sends_to_more_peers_after_getting_topology() { let approvals = vec![approval.clone()]; let expected_indices = vec![0, 10, 20, 30, 50, 51, 52, 53]; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2636,11 +2912,20 @@ fn originator_aggression_l1() { validator: validator_index, signature: dummy_signature(), }; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2795,11 +3080,20 @@ fn non_originator_aggression_l1() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2900,11 +3194,20 @@ fn non_originator_aggression_l2() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3046,11 +3349,20 @@ fn resends_messages_periodically() { for (peer, _) in &peers { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3190,7 +3502,15 @@ fn import_versioned_approval() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { From a5098b5ca48294edd15ae5c358ea3c50138132aa Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 22 Jan 2024 12:07:38 +0200 Subject: [PATCH 04/11] Fix clippy Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/approval-distribution/src/lib.rs | 5 ++--- polkadot/node/network/approval-distribution/src/tests.rs | 2 +- polkadot/node/network/protocol/src/grid_topology.rs | 3 --- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 59b80c8d07cc..434868ccbc87 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -727,15 +727,14 @@ impl State { .blocks_by_number .iter() .filter(|(block_number, _)| *block_number > &view.finalized_number) - .map(|(_, hashes)| { + .flat_map(|(_, hashes)| { hashes.iter().filter(|hash| { self.blocks .get(&hash) .map(|block| block.known_by.get(&peer_id).is_some()) .unwrap_or_default() }) - }) - .flatten(); + }); let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 5c63d7138e5f..e11ef2073d4b 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -153,7 +153,7 @@ fn make_gossip_topology( assert!(all_peers.len() >= grid_size); let peer_info = |i: usize| TopologyPeerInfo { - peer_ids: all_peers[i].0.clone().into_iter().collect_vec(), + peer_ids: all_peers[i].0.into_iter().collect_vec(), validator_index: ValidatorIndex::from(i as u32), discovery_id: all_peers[i].1.clone(), }; diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index ca489dd21553..2954691fa76c 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -103,7 +103,6 @@ impl SessionGridTopology { { peer.peer_ids.push(peer_id); self.peer_ids.insert(peer_id); - return true; } } false @@ -383,7 +382,6 @@ impl SessionGridTopologies { let entry = self.inner.entry(session).or_insert((None, 0)); if entry.0.is_none() { let local_neighbors = local_index - .clone() .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); @@ -461,7 +459,6 @@ impl SessionBoundGridTopologyStorage { local_index: Option, ) { let local_neighbors = local_index - .clone() .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); From 3d8e65a328b8485a4bb09b3405624e8cbff53256 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 22 Jan 2024 12:11:21 +0200 Subject: [PATCH 05/11] Add some commnents. Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/approval-distribution/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 434868ccbc87..b65c03bf99cf 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -668,7 +668,7 @@ impl State { ) { match event { NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => { - gum::info!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); + gum::debug!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); if let Some(authority_ids) = authority_ids { self.topologies.update_authority_ids(peer_id, &authority_ids); } @@ -720,7 +720,9 @@ impl State { self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; }, NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { - gum::info!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); + gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); + // If learnt about a new PeerId for an authority ids we need to try to route the + // messages that should have sent to that validator according to the topology. if self.topologies.update_authority_ids(peer_id, &authority_ids) { if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) { let intersection = self From d469351668be32a9c4cc3b6e04b5448a76896451 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 22 Jan 2024 15:01:41 +0200 Subject: [PATCH 06/11] Update other subsystems using old topology Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/bitfield-distribution/src/lib.rs | 7 +++++-- .../network/statement-distribution/src/legacy_v1/mod.rs | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 76baf499cad7..029401e0bd51 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -800,8 +800,11 @@ async fn handle_network_msg( }, NetworkBridgeEvent::PeerMessage(remote, message) => process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The bitfield-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + state + .topologies + .get_current_topology_mut() + .update_authority_ids(peer_id, &authority_ids); }, } } diff --git a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs index 93f97fe1dd6e..e22883f89376 100644 --- a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs +++ b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs @@ -1892,7 +1892,9 @@ pub(crate) async fn handle_network_update( ?authority_ids, "Updated `AuthorityDiscoveryId`s" ); - + topology_storage + .get_current_topology_mut() + .update_authority_ids(peer, &authority_ids); // Remove the authority IDs which were previously mapped to the peer // but aren't part of the new set. authorities.retain(|a, p| p != &peer || authority_ids.contains(a)); From 9168120eb1ab74c02d11fdb8d1bb63922740a7be Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 23 Jan 2024 12:23:44 +0200 Subject: [PATCH 07/11] Fixup unittests Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/protocol/src/grid_topology.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index 2954691fa76c..a48d71d2889c 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -95,6 +95,7 @@ impl SessionGridTopology { peer_id: PeerId, ids: &HashSet, ) -> bool { + let mut updated = false; if !self.peer_ids.contains(&peer_id) { for peer in self .canonical_shuffling @@ -103,9 +104,10 @@ impl SessionGridTopology { { peer.peer_ids.push(peer_id); self.peer_ids.insert(peer_id); + updated = true; } } - false + updated } /// Produces the outgoing routing logic for a particular peer. /// From e8e99731fb7bae99c252acba6d31cbc5d346ff37 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 23 Jan 2024 12:25:17 +0200 Subject: [PATCH 08/11] Fixup comment Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/approval-distribution/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index b65c03bf99cf..81e4ed7c0195 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -668,7 +668,7 @@ impl State { ) { match event { NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => { - gum::debug!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); + gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); if let Some(authority_ids) = authority_ids { self.topologies.update_authority_ids(peer_id, &authority_ids); } @@ -721,7 +721,7 @@ impl State { }, NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); - // If learnt about a new PeerId for an authority ids we need to try to route the + // If we learn about a new PeerId for an authority ids we need to try to route the // messages that should have sent to that validator according to the topology. if self.topologies.update_authority_ids(peer_id, &authority_ids) { if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) { From 4af961900f90a51bda11e8de0aacb16c126610fb Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 23 Jan 2024 12:29:16 +0200 Subject: [PATCH 09/11] Fixup comment Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/approval-distribution/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 81e4ed7c0195..8b377f684920 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -1899,7 +1899,7 @@ impl State { // mapping has already been sent all messages it's meant to get for that block and // all in-scope prior blocks. In case, we just learnt about its peer_id // authorithy-id mapping we have to retry sending the messages that should be sent - // to. + // to it for all un-finalized blocks. if entry.known_by.contains_key(&peer_id) && !retry_known_blocks { break } From 53ea9a3edb431d0d56b7638c1130483e49cbbdff Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 23 Jan 2024 12:34:38 +0200 Subject: [PATCH 10/11] Refactor test Signed-off-by: Alexandru Gheorghe --- .../approval-distribution/src/tests.rs | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index e11ef2073d4b..11d82931651e 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -1767,10 +1767,13 @@ fn update_peer_authority_id() { let hash_b = Hash::repeat_byte(0xBB); let hash_c = Hash::repeat_byte(0xCC); let peers = make_peers_and_authority_ids(8); + let neighbour_x_index = 0; + let neighbour_y_index = 2; + let local_index = 1; // X neighbour, we simulate that PeerId is not known in the beginining. - let neighbour_x = peers.first().unwrap().0; + let neighbour_x = peers.get(neighbour_x_index).unwrap().0; // Y neighbour, we simulate that PeerId is not known in the beginining. - let neighbour_y = peers.get(2).unwrap().0; + let neighbour_y = peers.get(neighbour_y_index).unwrap().0; let _state = test_harness(State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1814,12 +1817,18 @@ fn update_peer_authority_id() { // Setup a topology where peer_a is neigboor to current node. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[neighbour_x_index], + &[neighbour_y_index], + local_index, + ), ) .await; - let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(0)); - let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(0)); + let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(local_index as u32)); + let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(local_index as u32)); overseer_send( overseer, @@ -1872,8 +1881,8 @@ fn update_peer_authority_id() { overseer, ApprovalDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::UpdatedAuthorityIds( - peers[0].0, - [peers[0].1.clone()].into_iter().collect(), + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), ), ), ) @@ -1901,8 +1910,8 @@ fn update_peer_authority_id() { overseer, ApprovalDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::UpdatedAuthorityIds( - peers[2].0, - [peers[2].1.clone()].into_iter().collect(), + peers[neighbour_y_index].0, + [peers[neighbour_y_index].1.clone()].into_iter().collect(), ), ), ) @@ -1911,8 +1920,8 @@ fn update_peer_authority_id() { overseer, ApprovalDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::UpdatedAuthorityIds( - peers[0].0, - [peers[0].1.clone()].into_iter().collect(), + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), ), ), ) From d9808325ea8db1f57ac9640d03cb352ab4f9f248 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Wed, 24 Jan 2024 13:21:43 +0200 Subject: [PATCH 11/11] Fixup unneeded unwrap Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/protocol/src/grid_topology.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index a48d71d2889c..3c4372a27a2c 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -324,9 +324,9 @@ impl SessionGridTopologyEntry { // neighbors_x and neighbors_y reflect the right peer ids. if peer_id_updated { if let Some(local_index) = self.local_index.as_ref() { - let grid_neighbors = - self.topology.compute_grid_neighbors_for(*local_index).unwrap(); - self.local_neighbors = grid_neighbors; + if let Some(new_grid) = self.topology.compute_grid_neighbors_for(*local_index) { + self.local_neighbors = new_grid; + } } } peer_id_updated