diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index d520febaef51..8b377f684920 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -667,9 +667,12 @@ impl State { rng: &mut (impl CryptoRng + Rng), ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role, version, _) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => { + gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); + if let Some(authority_ids) = authority_ids { + self.topologies.update_authority_ids(peer_id, &authority_ids); + } // insert a blank view if none already present - gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected"); self.peer_views .entry(peer_id) .or_insert(PeerEntry { view: Default::default(), version }); @@ -716,8 +719,41 @@ impl State { NetworkBridgeEvent::PeerMessage(peer_id, message) => { self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; }, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The approval-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); + // If we learn about a new PeerId for an authority ids we need to try to route the + // messages that should have sent to that validator according to the topology. + if self.topologies.update_authority_ids(peer_id, &authority_ids) { + if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) { + let intersection = self + .blocks_by_number + .iter() + .filter(|(block_number, _)| *block_number > &view.finalized_number) + .flat_map(|(_, hashes)| { + hashes.iter().filter(|hash| { + self.blocks + .get(&hash) + .map(|block| block.known_by.get(&peer_id).is_some()) + .unwrap_or_default() + }) + }); + let view_intersection = + View::new(intersection.cloned(), view.finalized_number); + Self::unify_with_peer( + ctx.sender(), + metrics, + &mut self.blocks, + &self.topologies, + self.peer_views.len(), + peer_id, + *version, + view_intersection, + rng, + true, + ) + .await; + } + } }, } } @@ -789,6 +825,7 @@ impl State { *version, view_intersection, rng, + false, ) .await; } @@ -1101,6 +1138,7 @@ impl State { protocol_version, view, rng, + false, ) .await; } @@ -1838,6 +1876,7 @@ impl State { protocol_version: ProtocolVersion, view: View, rng: &mut (impl CryptoRng + Rng), + retry_known_blocks: bool, ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); @@ -1856,10 +1895,12 @@ impl State { _ => break, }; - // Any peer which is in the `known_by` set has already been - // sent all messages it's meant to get for that block and all - // in-scope prior blocks. - if entry.known_by.contains_key(&peer_id) { + // Any peer which is in the `known_by` see and we know its peer_id authorithy id + // mapping has already been sent all messages it's meant to get for that block and + // all in-scope prior blocks. In case, we just learnt about its peer_id + // authorithy-id mapping we have to retry sending the messages that should be sent + // to it for all un-finalized blocks. + if entry.known_by.contains_key(&peer_id) && !retry_known_blocks { break } diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 7d933e2047f2..11d82931651e 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -130,7 +130,7 @@ fn make_peers_and_authority_ids(n: usize) -> Vec<(PeerId, AuthorityDiscoveryId)> fn make_gossip_topology( session: SessionIndex, - all_peers: &[(PeerId, AuthorityDiscoveryId)], + all_peers: &[(Option, AuthorityDiscoveryId)], neighbors_x: &[usize], neighbors_y: &[usize], local_index: usize, @@ -153,7 +153,7 @@ fn make_gossip_topology( assert!(all_peers.len() >= grid_size); let peer_info = |i: usize| TopologyPeerInfo { - peer_ids: vec![all_peers[i].0], + peer_ids: all_peers[i].0.into_iter().collect_vec(), validator_index: ValidatorIndex::from(i as u32), discovery_id: all_peers[i].1.clone(), }; @@ -396,7 +396,15 @@ fn try_import_the_same_assignment() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -485,7 +493,15 @@ fn try_import_the_same_assignment_v2() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -724,8 +740,16 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { let peer = &peer_a; setup_peer_with_view(overseer, peer, view![], ValidationVersion::V1).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; // new block `hash` with 1 candidates let meta = BlockApprovalMeta { @@ -822,8 +846,16 @@ fn import_approval_happy_path_v1_v2_peers() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -936,8 +968,16 @@ fn import_approval_happy_path_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -1039,8 +1079,16 @@ fn multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1221,8 +1269,16 @@ fn unify_with_peer_multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1571,8 +1627,16 @@ fn update_peer_view() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(0)); let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(0)); @@ -1694,6 +1758,183 @@ fn update_peer_view() { assert!(state.blocks.get(&hash_c).unwrap().known_by.get(peer).is_none()); } +// Tests that updating the known peer_id for a given authorithy updates the topology +// and sends the required messages +#[test] +fn update_peer_authority_id() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash_a = Hash::repeat_byte(0xAA); + let hash_b = Hash::repeat_byte(0xBB); + let hash_c = Hash::repeat_byte(0xCC); + let peers = make_peers_and_authority_ids(8); + let neighbour_x_index = 0; + let neighbour_y_index = 2; + let local_index = 1; + // X neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_x = peers.get(neighbour_x_index).unwrap().0; + // Y neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_y = peers.get(neighbour_y_index).unwrap().0; + + let _state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // new block `hash_a` with 1 candidates + let meta_a = BlockApprovalMeta { + hash: hash_a, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_b = BlockApprovalMeta { + hash: hash_b, + parent_hash: hash_a, + number: 2, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_c = BlockApprovalMeta { + hash: hash_c, + parent_hash: hash_b, + number: 3, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); + overseer_send(overseer, msg).await; + + let peers_with_optional_peer_id = peers + .iter() + .enumerate() + .map(|(index, (peer_id, authority))| { + (if index == 0 { None } else { Some(*peer_id) }, authority.clone()) + }) + .collect_vec(); + + // Setup a topology where peer_a is neigboor to current node. + setup_gossip_topology( + overseer, + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[neighbour_x_index], + &[neighbour_y_index], + local_index, + ), + ) + .await; + + let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(local_index as u32)); + let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(local_index as u32)); + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_a.into(), 0.into()), + ) + .await; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_b.into(), 0.into()), + ) + .await; + + // connect a peer + setup_peer_with_view(overseer, &neighbour_x, view![hash_a], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_a], ValidationVersion::V1).await; + + setup_peer_with_view(overseer, &neighbour_x, view![hash_b], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_b], ValidationVersion::V1).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + + // we should send relevant assignments to the peer, after we found it's peer id. + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + gum::info!(target: LOG_TARGET, ?peers, ?assignments); + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 2); + assert_eq!(assignments.get(0).unwrap().0.block_hash, hash_a); + assert_eq!(assignments.get(1).unwrap().0.block_hash, hash_b); + assert_eq!(peers.get(0), Some(&neighbour_x)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_y_index].0, + [peers[neighbour_y_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + assert!( + overseer.recv().timeout(TIMEOUT).await.is_none(), + "no message should be sent peers are already known" + ); + + virtual_overseer + }); +} + /// E.g. if someone copies the keys... #[test] fn import_remotely_then_locally() { @@ -1808,8 +2049,16 @@ fn sends_assignments_even_when_state_is_approved() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let candidate_index = 0u32; @@ -1900,8 +2149,16 @@ fn sends_assignments_even_when_state_is_approved_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let cores = vec![0, 1, 2, 3]; @@ -2080,12 +2337,17 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2197,10 +2459,21 @@ fn propagates_assignments_along_unshared_dimension() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2339,13 +2612,16 @@ fn propagates_to_required_after_connect() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2533,11 +2809,20 @@ fn sends_to_more_peers_after_getting_topology() { let approvals = vec![approval.clone()]; let expected_indices = vec![0, 10, 20, 30, 50, 51, 52, 53]; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2636,11 +2921,20 @@ fn originator_aggression_l1() { validator: validator_index, signature: dummy_signature(), }; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2795,11 +3089,20 @@ fn non_originator_aggression_l1() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2900,11 +3203,20 @@ fn non_originator_aggression_l2() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3046,11 +3358,20 @@ fn resends_messages_periodically() { for (peer, _) in &peers { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3190,7 +3511,15 @@ fn import_versioned_approval() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 76baf499cad7..029401e0bd51 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -800,8 +800,11 @@ async fn handle_network_msg( }, NetworkBridgeEvent::PeerMessage(remote, message) => process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The bitfield-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + state + .topologies + .get_current_topology_mut() + .update_authority_ids(peer_id, &authority_ids); }, } } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 22417795d5ea..86245479baf7 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -270,9 +270,10 @@ where session_index, ) .await?; - - self.update_authority_ids(sender, session_info.discovery_keys).await; } + // authority_discovery is just a cache so let's try every leaf to detect if there + // are new authorities there. + self.update_authority_ids(sender, session_info.discovery_keys).await; } } Ok(()) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index 8bd9adbc17c1..3c4372a27a2c 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -89,6 +89,26 @@ impl SessionGridTopology { SessionGridTopology { shuffled_indices, canonical_shuffling, peer_ids } } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + let mut updated = false; + if !self.peer_ids.contains(&peer_id) { + for peer in self + .canonical_shuffling + .iter_mut() + .filter(|peer| ids.contains(&peer.discovery_id)) + { + peer.peer_ids.push(peer_id); + self.peer_ids.insert(peer_id); + updated = true; + } + } + updated + } /// Produces the outgoing routing logic for a particular peer. /// /// Returns `None` if the validator index is out of bounds. @@ -269,6 +289,7 @@ impl GridNeighbors { pub struct SessionGridTopologyEntry { topology: SessionGridTopology, local_neighbors: GridNeighbors, + local_index: Option, } impl SessionGridTopologyEntry { @@ -291,6 +312,25 @@ impl SessionGridTopologyEntry { pub fn is_validator(&self, peer: &PeerId) -> bool { self.topology.is_validator(peer) } + + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + let peer_id_updated = self.topology.update_authority_ids(peer_id, ids); + // If we added a new peer id we need to recompute the grid neighbors, so that + // neighbors_x and neighbors_y reflect the right peer ids. + if peer_id_updated { + if let Some(local_index) = self.local_index.as_ref() { + if let Some(new_grid) = self.topology.compute_grid_neighbors_for(*local_index) { + self.local_neighbors = new_grid; + } + } + } + peer_id_updated + } } /// A set of topologies indexed by session @@ -305,6 +345,20 @@ impl SessionGridTopologies { self.inner.get(&session).and_then(|val| val.0.as_ref()) } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet, + ) -> bool { + self.inner + .iter_mut() + .map(|(_, topology)| { + topology.0.as_mut().map(|topology| topology.update_authority_ids(peer_id, ids)) + }) + .any(|updated| updated.unwrap_or_default()) + } + /// Increase references counter for a specific topology pub fn inc_session_refs(&mut self, session: SessionIndex) { self.inner.entry(session).or_insert((None, 0)).1 += 1; @@ -333,7 +387,7 @@ impl SessionGridTopologies { .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); - entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors }); + entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors, local_index }); } } } @@ -368,6 +422,7 @@ impl Default for SessionBoundGridTopologyStorage { peer_ids: Default::default(), }, local_neighbors: GridNeighbors::empty(), + local_index: None, }, }, prev_topology: None, @@ -412,7 +467,7 @@ impl SessionBoundGridTopologyStorage { let old_current = std::mem::replace( &mut self.current_topology, GridTopologySessionBound { - entry: SessionGridTopologyEntry { topology, local_neighbors }, + entry: SessionGridTopologyEntry { topology, local_neighbors, local_index }, session_index, }, ); diff --git a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs index 93f97fe1dd6e..e22883f89376 100644 --- a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs +++ b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs @@ -1892,7 +1892,9 @@ pub(crate) async fn handle_network_update( ?authority_ids, "Updated `AuthorityDiscoveryId`s" ); - + topology_storage + .get_current_topology_mut() + .update_authority_ids(peer, &authority_ids); // Remove the authority IDs which were previously mapped to the peer // but aren't part of the new set. authorities.retain(|a, p| p != &peer || authority_ids.contains(a));