From 1892aa1d8f50fd0e3eea2c313933ab4e37742745 Mon Sep 17 00:00:00 2001 From: Alin Dima Date: Tue, 21 May 2024 11:14:42 +0300 Subject: [PATCH] Remove the prospective-parachains subsystem from collators (#4471) Implements https://github.com/paritytech/polkadot-sdk/issues/4429 Collators only need to maintain the implicit view for the paraid they are collating on. In this case, bypass prospective-parachains entirely. It's still useful to use the GetMinimumRelayParents message from prospective-parachains for validators, because the data is already present there. This enables us to entirely remove the subsystem from collators, which consumed resources needlessly Aims to resolve https://github.com/paritytech/polkadot-sdk/issues/4167 TODO: - [x] fix unit tests --- .../src/tests/prospective_parachains.rs | 21 +- .../src/collator_side/mod.rs | 58 ++- .../tests/prospective_parachains.rs | 102 +++- .../tests/prospective_parachains.rs | 20 +- .../src/v2/tests/mod.rs | 26 +- polkadot/node/service/src/overseer.rs | 4 +- .../src/backing_implicit_view.rs | 463 +++++++++++++++--- prdoc/pr_4471.prdoc | 16 + 8 files changed, 581 insertions(+), 129 deletions(-) create mode 100644 prdoc/pr_4471.prdoc diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index 8a72902f08150..c93cf21ef7d8e 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -67,15 +67,6 @@ async fn activate_leaf( .min() .unwrap_or(&leaf_number); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::ProspectiveParachains( - ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx) - ) if parent == leaf_hash => { - tx.send(min_relay_parents).unwrap(); - } - ); - let ancestry_len = leaf_number + 1 - min_min; let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h))) @@ -117,6 +108,18 @@ async fn activate_leaf( tx.send(Ok(Some(header))).unwrap(); } ); + + if requested_len == 0 { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ProspectiveParachains( + ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx) + ) if parent == leaf_hash => { + tx.send(min_relay_parents.clone()).unwrap(); + } + ); + } + requested_len += 1; } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index f227e3855fa0a..88375d5830904 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -264,7 +264,9 @@ struct State { /// never included in the fragment chains of active leaves which do. In /// particular, this means that if a given relay parent belongs to implicit /// ancestry of some active leaf, then it does support prospective parachains. - implicit_view: ImplicitView, + /// + /// It's `None` if the collator is not yet collating for a paraid. + implicit_view: Option, /// All active leaves observed by us, including both that do and do not /// support prospective parachains. This mapping works as a replacement for @@ -334,7 +336,7 @@ impl State { metrics, collating_on: Default::default(), peer_data: Default::default(), - implicit_view: Default::default(), + implicit_view: None, active_leaves: Default::default(), per_relay_parent: Default::default(), span_per_relay_parent: Default::default(), @@ -539,11 +541,12 @@ async fn distribute_collation( .filter(|(_, PeerData { view: v, .. })| match relay_parent_mode { ProspectiveParachainsMode::Disabled => v.contains(&candidate_relay_parent), ProspectiveParachainsMode::Enabled { .. } => v.iter().any(|block_hash| { - state - .implicit_view - .known_allowed_relay_parents_under(block_hash, Some(id)) - .unwrap_or_default() - .contains(&candidate_relay_parent) + state.implicit_view.as_ref().map(|implicit_view| { + implicit_view + .known_allowed_relay_parents_under(block_hash, Some(id)) + .unwrap_or_default() + .contains(&candidate_relay_parent) + }) == Some(true) }), }); @@ -830,6 +833,7 @@ async fn process_msg( match msg { CollateOn(id) => { state.collating_on = Some(id); + state.implicit_view = Some(ImplicitView::new(Some(id))); }, DistributeCollation { candidate_receipt, @@ -1215,7 +1219,10 @@ async fn handle_peer_view_change( Some(ProspectiveParachainsMode::Disabled) => std::slice::from_ref(&added), Some(ProspectiveParachainsMode::Enabled { .. }) => state .implicit_view - .known_allowed_relay_parents_under(&added, state.collating_on) + .as_ref() + .and_then(|implicit_view| { + implicit_view.known_allowed_relay_parents_under(&added, state.collating_on) + }) .unwrap_or_default(), None => { gum::trace!( @@ -1353,21 +1360,22 @@ where state.per_relay_parent.insert(*leaf, PerRelayParent::new(mode)); if mode.is_enabled() { - state - .implicit_view - .activate_leaf(sender, *leaf) - .await - .map_err(Error::ImplicitViewFetchError)?; + if let Some(ref mut implicit_view) = state.implicit_view { + implicit_view + .activate_leaf(sender, *leaf) + .await + .map_err(Error::ImplicitViewFetchError)?; - let allowed_ancestry = state - .implicit_view - .known_allowed_relay_parents_under(leaf, state.collating_on) - .unwrap_or_default(); - for block_hash in allowed_ancestry { - state - .per_relay_parent - .entry(*block_hash) - .or_insert_with(|| PerRelayParent::new(mode)); + let allowed_ancestry = implicit_view + .known_allowed_relay_parents_under(leaf, state.collating_on) + .unwrap_or_default(); + + for block_hash in allowed_ancestry { + state + .per_relay_parent + .entry(*block_hash) + .or_insert_with(|| PerRelayParent::new(mode)); + } } } } @@ -1378,7 +1386,11 @@ where // of implicit ancestry. Only update the state after the hash is actually // pruned from the block info storage. let pruned = if mode.is_enabled() { - state.implicit_view.deactivate_leaf(*leaf) + state + .implicit_view + .as_mut() + .map(|view| view.deactivate_leaf(*leaf)) + .unwrap_or_default() } else { vec![*leaf] }; diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 707053545630a..2a147aef69e2f 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -18,7 +18,7 @@ use super::*; -use polkadot_node_subsystem::messages::{ChainApiMessage, ProspectiveParachainsMessage}; +use polkadot_node_subsystem::messages::ChainApiMessage; use polkadot_primitives::{AsyncBackingParams, Header, OccupiedCore}; const ASYNC_BACKING_PARAMETERS: AsyncBackingParams = @@ -31,7 +31,6 @@ fn get_parent_hash(hash: Hash) -> Hash { /// Handle a view update. async fn update_view( virtual_overseer: &mut VirtualOverseer, - test_state: &TestState, new_view: Vec<(Hash, u32)>, // Hash and block number. activated: u8, // How many new heads does this update contain? ) { @@ -61,21 +60,88 @@ async fn update_view( let min_number = leaf_number.saturating_sub(ASYNC_BACKING_PARAMETERS.allowed_ancestry_len); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ProspectiveParachains( - ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx), - ) if parent == leaf_hash => { - tx.send(vec![(test_state.para_id, min_number)]).unwrap(); - } - ); - let ancestry_len = leaf_number + 1 - min_number; let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h))) .take(ancestry_len as usize); let ancestry_numbers = (min_number..=leaf_number).rev(); let mut ancestry_iter = ancestry_hashes.clone().zip(ancestry_numbers).peekable(); + if let Some((hash, number)) = ancestry_iter.next() { + assert_matches!( + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), + AllMessages::ChainApi(ChainApiMessage::BlockHeader(.., tx)) => { + let header = Header { + parent_hash: get_parent_hash(hash), + number, + state_root: Hash::zero(), + extrinsics_root: Hash::zero(), + digest: Default::default(), + }; + + tx.send(Ok(Some(header))).unwrap(); + } + ); + + assert_matches!( + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + .., + RuntimeApiRequest::AsyncBackingParams( + tx + ) + ) + ) => { + tx.send(Ok(ASYNC_BACKING_PARAMETERS)).unwrap(); + } + ); + + assert_matches!( + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + .., + RuntimeApiRequest::SessionIndexForChild( + tx + ) + ) + ) => { + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), + AllMessages::ChainApi( + ChainApiMessage::Ancestors { + k, + response_channel: tx, + .. + } + ) => { + assert_eq!(k, ASYNC_BACKING_PARAMETERS.allowed_ancestry_len as usize); + + tx.send(Ok(ancestry_hashes.clone().skip(1).into_iter().collect())).unwrap(); + } + ); + } + + for _ in ancestry_iter.clone() { + assert_matches!( + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + .., + RuntimeApiRequest::SessionIndexForChild( + tx + ) + ) + ) => { + tx.send(Ok(1)).unwrap(); + } + ); + } + while let Some((hash, number)) = ancestry_iter.next() { // May be `None` for the last element. let parent_hash = @@ -195,7 +261,7 @@ fn distribute_collation_from_implicit_view() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; // Activated leaf is `b`, but the collation will be based on `c`. - update_view(virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await; + update_view(virtual_overseer, vec![(head_b, head_b_num)], 1).await; let validator_peer_ids = test_state.current_group_validator_peer_ids(); for (val, peer) in test_state @@ -258,7 +324,7 @@ fn distribute_collation_from_implicit_view() { // Head `c` goes out of view. // Build a different candidate for this relay parent and attempt to distribute it. - update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await; + update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; let pov = PoV { block_data: BlockData(vec![4, 5, 6]) }; let parent_head_data_hash = Hash::repeat_byte(0xBB); @@ -318,7 +384,7 @@ fn distribute_collation_up_to_limit() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; // Activated leaf is `a`, but the collation will be based on `b`. - update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await; + update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; for i in 0..(ASYNC_BACKING_PARAMETERS.max_candidate_depth + 1) { let pov = PoV { block_data: BlockData(vec![i as u8]) }; @@ -402,7 +468,7 @@ fn send_parent_head_data_for_elastic_scaling() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await; + update_view(&mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; let pov_data = PoV { block_data: BlockData(vec![1 as u8]) }; let candidate = TestCandidateBuilder { @@ -517,8 +583,8 @@ fn advertise_and_send_collation_by_hash() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await; - update_view(&mut virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await; + update_view(&mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; + update_view(&mut virtual_overseer, vec![(head_a, head_a_num)], 1).await; let candidates: Vec<_> = (0..2) .map(|i| { @@ -638,7 +704,7 @@ fn advertise_core_occupied() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; // Activated leaf is `a`, but the collation will be based on `b`. - update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await; + update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; let pov = PoV { block_data: BlockData(vec![1, 2, 3]) }; let candidate = TestCandidateBuilder { diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs index 785690121dadd..178dcb85e035f 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs @@ -116,15 +116,6 @@ pub(super) async fn update_view( let min_number = leaf_number.saturating_sub(ASYNC_BACKING_PARAMETERS.allowed_ancestry_len); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ProspectiveParachains( - ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx), - ) if parent == leaf_hash => { - tx.send(test_state.chain_ids.iter().map(|para_id| (*para_id, min_number)).collect()).unwrap(); - } - ); - let ancestry_len = leaf_number + 1 - min_number; let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h))) .take(ancestry_len as usize); @@ -166,6 +157,17 @@ pub(super) async fn update_view( } ); + if requested_len == 0 { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ProspectiveParachains( + ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx), + ) if parent == leaf_hash => { + tx.send(test_state.chain_ids.iter().map(|para_id| (*para_id, min_number)).collect()).unwrap(); + } + ); + } + requested_len += 1; } } diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs index d32e2323ba346..f9a484f47a94c 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs @@ -586,19 +586,6 @@ async fn handle_leaf_activation( } ); - let mrp_response: Vec<(ParaId, BlockNumber)> = para_data - .iter() - .map(|(para_id, data)| (*para_id, data.min_relay_parent)) - .collect(); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::ProspectiveParachains( - ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx) - ) if parent == *hash => { - tx.send(mrp_response).unwrap(); - } - ); - let header = Header { parent_hash: *parent_hash, number: *number, @@ -615,6 +602,19 @@ async fn handle_leaf_activation( } ); + let mrp_response: Vec<(ParaId, BlockNumber)> = para_data + .iter() + .map(|(para_id, data)| (*para_id, data.min_relay_parent)) + .collect(); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ProspectiveParachains( + ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx) + ) if parent == *hash => { + tx.send(mrp_response).unwrap(); + } + ); + loop { match virtual_overseer.recv().await { AllMessages::RuntimeApi(RuntimeApiMessage::Request( diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 26b1446bf515a..4b7777a096714 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -385,7 +385,7 @@ pub fn collator_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, - ProspectiveParachainsSubsystem, + DummySubsystem, >, Error, > @@ -462,7 +462,7 @@ where .dispute_coordinator(DummySubsystem) .dispute_distribution(DummySubsystem) .chain_selection(DummySubsystem) - .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?)) + .prospective_parachains(DummySubsystem) .activation_external_listeners(Default::default()) .span_per_active_leaf(Default::default()) .active_leaves(Default::default()) diff --git a/polkadot/node/subsystem-util/src/backing_implicit_view.rs b/polkadot/node/subsystem-util/src/backing_implicit_view.rs index a14536a17666c..23a758d25715b 100644 --- a/polkadot/node/subsystem-util/src/backing_implicit_view.rs +++ b/polkadot/node/subsystem-util/src/backing_implicit_view.rs @@ -17,23 +17,45 @@ use futures::channel::oneshot; use polkadot_node_subsystem::{ errors::ChainApiError, - messages::{ChainApiMessage, ProspectiveParachainsMessage}, + messages::{ChainApiMessage, ProspectiveParachainsMessage, RuntimeApiMessage}, SubsystemSender, }; use polkadot_primitives::{BlockNumber, Hash, Id as ParaId}; use std::collections::HashMap; +use crate::{ + request_session_index_for_child, + runtime::{self, prospective_parachains_mode, recv_runtime, ProspectiveParachainsMode}, +}; + // Always aim to retain 1 block before the active leaves. const MINIMUM_RETAIN_LENGTH: BlockNumber = 2; /// Handles the implicit view of the relay chain derived from the immediate view, which /// is composed of active leaves, and the minimum relay-parents allowed for /// candidates of various parachains at those leaves. -#[derive(Default, Clone)] +#[derive(Clone)] pub struct View { leaves: HashMap, block_info_storage: HashMap, + collating_for: Option, +} + +impl View { + /// Create a new empty view. + /// If `collating_for` is `Some`, the node is a collator and is only interested in the allowed + /// relay parents of a single paraid. When this is true, prospective-parachains is no longer + /// queried. + pub fn new(collating_for: Option) -> Self { + Self { leaves: Default::default(), block_info_storage: Default::default(), collating_for } + } +} + +impl Default for View { + fn default() -> Self { + Self::new(None) + } } // Minimum relay parents implicitly relative to a particular block. @@ -106,15 +128,13 @@ impl View { } /// Activate a leaf in the view. - /// This will request the minimum relay parents from the - /// Prospective Parachains subsystem for each leaf and will load headers in the ancestry of each - /// leaf in the view as needed. These are the 'implicit ancestors' of the leaf. + /// This will request the minimum relay parents the leaf and will load headers in the + /// ancestry of the leaf as needed. These are the 'implicit ancestors' of the leaf. /// /// To maximize reuse of outdated leaves, it's best to activate new leaves before /// deactivating old ones. /// - /// This returns a list of para-ids which are relevant to the leaf, - /// and the allowed relay parents for these paras under this leaf can be + /// The allowed relay parents for the relevant paras under this leaf can be /// queried with [`View::known_allowed_relay_parents_under`]. /// /// No-op for known leaves. @@ -122,10 +142,11 @@ impl View { &mut self, sender: &mut Sender, leaf_hash: Hash, - ) -> Result, FetchError> + ) -> Result<(), FetchError> where - Sender: SubsystemSender, - Sender: SubsystemSender, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, { if self.leaves.contains_key(&leaf_hash) { return Err(FetchError::AlreadyKnown) @@ -135,6 +156,7 @@ impl View { leaf_hash, &mut self.block_info_storage, &mut *sender, + self.collating_for, ) .await; @@ -150,7 +172,7 @@ impl View { self.leaves.insert(leaf_hash, ActiveLeafPruningInfo { retain_minimum }); - Ok(fetched.relevant_paras) + Ok(()) }, Err(e) => Err(e), } @@ -249,6 +271,10 @@ pub enum FetchError { /// Request to the Chain API subsystem failed. #[error("The chain API subsystem was unavailable")] ChainApiUnavailable, + + /// Request to the runtime API failed. + #[error("Runtime API error: {0}")] + RuntimeApi(#[from] runtime::Error), } /// Reasons a block header might have been unavailable. @@ -265,30 +291,92 @@ pub enum BlockHeaderUnavailableReason { struct FetchSummary { minimum_ancestor_number: BlockNumber, leaf_number: BlockNumber, - relevant_paras: Vec, } -async fn fetch_fresh_leaf_and_insert_ancestry( +// Request the min relay parents from prospective-parachains. +async fn fetch_min_relay_parents_from_prospective_parachains< + Sender: SubsystemSender, +>( leaf_hash: Hash, - block_info_storage: &mut HashMap, sender: &mut Sender, -) -> Result +) -> Result, FetchError> { + let (tx, rx) = oneshot::channel(); + sender + .send_message(ProspectiveParachainsMessage::GetMinimumRelayParents(leaf_hash, tx)) + .await; + + rx.await.map_err(|_| FetchError::ProspectiveParachainsUnavailable) +} + +// Request the min relay parent for the purposes of a collator, directly using ChainApi (where +// prospective-parachains is not available). +async fn fetch_min_relay_parents_for_collator( + leaf_hash: Hash, + leaf_number: BlockNumber, + sender: &mut Sender, +) -> Result, FetchError> where - Sender: SubsystemSender, - Sender: SubsystemSender, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, { - let min_relay_parents_raw = { - let (tx, rx) = oneshot::channel(); - sender - .send_message(ProspectiveParachainsMessage::GetMinimumRelayParents(leaf_hash, tx)) - .await; + let Ok(ProspectiveParachainsMode::Enabled { allowed_ancestry_len, .. }) = + prospective_parachains_mode(sender, leaf_hash).await + else { + // This should never happen, leaves that don't have prospective parachains mode enabled + // should not use implicit view. + return Ok(None) + }; - match rx.await { - Ok(m) => m, - Err(_) => return Err(FetchError::ProspectiveParachainsUnavailable), + // Fetch the session of the leaf. We must make sure that we stop at the ancestor which has a + // different session index. + let required_session = + recv_runtime(request_session_index_for_child(leaf_hash, sender).await).await?; + + let mut min = leaf_number; + + // Fetch the ancestors, up to allowed_ancestry_len. + let (tx, rx) = oneshot::channel(); + sender + .send_message(ChainApiMessage::Ancestors { + hash: leaf_hash, + k: allowed_ancestry_len, + response_channel: tx, + }) + .await; + let hashes = rx + .await + .map_err(|_| FetchError::ChainApiUnavailable)? + .map_err(|err| FetchError::ChainApiError(leaf_hash, err))?; + + for hash in hashes { + // The relay chain cannot accept blocks backed from previous sessions, with + // potentially previous validators. This is a technical limitation we need to + // respect here. + let session = recv_runtime(request_session_index_for_child(hash, sender).await).await?; + + if session == required_session { + // We should never underflow here, the ChainAPI stops at genesis block. + min = min.saturating_sub(1); + } else { + break } - }; + } + Ok(Some(min)) +} + +async fn fetch_fresh_leaf_and_insert_ancestry( + leaf_hash: Hash, + block_info_storage: &mut HashMap, + sender: &mut Sender, + collating_for: Option, +) -> Result +where + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, +{ let leaf_header = { let (tx, rx) = oneshot::channel(); sender.send_message(ChainApiMessage::BlockHeader(leaf_hash, tx)).await; @@ -313,8 +401,18 @@ where } }; - let min_min = min_relay_parents_raw.iter().map(|x| x.1).min().unwrap_or(leaf_header.number); - let relevant_paras = min_relay_parents_raw.iter().map(|x| x.0).collect(); + // If the node is a collator, bypass prospective-parachains. We're only interested in the one + // paraid and the subsystem is not present. + let min_relay_parents = if let Some(para_id) = collating_for { + fetch_min_relay_parents_for_collator(leaf_hash, leaf_header.number, sender) + .await? + .map(|x| vec![(para_id, x)]) + .unwrap_or_default() + } else { + fetch_min_relay_parents_from_prospective_parachains(leaf_hash, sender).await? + }; + + let min_min = min_relay_parents.iter().map(|x| x.1).min().unwrap_or(leaf_header.number); let expected_ancestry_len = (leaf_header.number.saturating_sub(min_min) as usize) + 1; let ancestry = if leaf_header.number > 0 { @@ -380,14 +478,11 @@ where vec![leaf_hash] }; - let fetched_ancestry = FetchSummary { - minimum_ancestor_number: min_min, - leaf_number: leaf_header.number, - relevant_paras, - }; + let fetched_ancestry = + FetchSummary { minimum_ancestor_number: min_min, leaf_number: leaf_header.number }; let allowed_relay_parents = AllowedRelayParents { - minimum_relay_parents: min_relay_parents_raw.iter().cloned().collect(), + minimum_relay_parents: min_relay_parents.into_iter().collect(), allowed_relay_parents_contiguous: ancestry, }; @@ -408,12 +503,12 @@ mod tests { use crate::TimeoutExt; use assert_matches::assert_matches; use futures::future::{join, FutureExt}; - use polkadot_node_subsystem::AllMessages; + use polkadot_node_subsystem::{messages::RuntimeApiRequest, AllMessages}; use polkadot_node_subsystem_test_helpers::{ make_subsystem_context, TestSubsystemContextHandle, }; use polkadot_overseer::SubsystemContext; - use polkadot_primitives::Header; + use polkadot_primitives::{AsyncBackingParams, Header}; use sp_core::testing::TaskExecutor; use std::time::Duration; @@ -514,6 +609,71 @@ mod tests { ); } + async fn assert_async_backing_params_request( + virtual_overseer: &mut VirtualOverseer, + leaf: Hash, + params: AsyncBackingParams, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + leaf_hash, + RuntimeApiRequest::AsyncBackingParams( + tx + ) + ) + ) => { + assert_eq!(leaf, leaf_hash, "received unexpected leaf hash"); + tx.send(Ok(params)).unwrap(); + } + ); + } + + async fn assert_session_index_request( + virtual_overseer: &mut VirtualOverseer, + leaf: Hash, + session: u32, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + leaf_hash, + RuntimeApiRequest::SessionIndexForChild( + tx + ) + ) + ) => { + assert_eq!(leaf, leaf_hash, "received unexpected leaf hash"); + tx.send(Ok(session)).unwrap(); + } + ); + } + + async fn assert_ancestors_request( + virtual_overseer: &mut VirtualOverseer, + leaf: Hash, + expected_ancestor_len: u32, + response: Vec, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi( + ChainApiMessage::Ancestors { + hash: leaf_hash, + k, + response_channel: tx + } + ) => { + assert_eq!(leaf, leaf_hash, "received unexpected leaf hash"); + assert_eq!(k, expected_ancestor_len as usize); + + tx.send(Ok(response)).unwrap(); + } + ); + } + #[test] fn construct_fresh_view() { let pool = TaskExecutor::new(); @@ -521,6 +681,8 @@ mod tests { let mut view = View::default(); + assert_eq!(view.collating_for, None); + // Chain B. const PARA_A_MIN_PARENT: u32 = 4; const PARA_B_MIN_PARENT: u32 = 3; @@ -528,15 +690,17 @@ mod tests { let prospective_response = vec![(PARA_A, PARA_A_MIN_PARENT), (PARA_B, PARA_B_MIN_PARENT)]; let leaf = CHAIN_B.last().unwrap(); + let leaf_idx = CHAIN_B.len() - 1; let min_min_idx = (PARA_B_MIN_PARENT - GENESIS_NUMBER - 1) as usize; let fut = view.activate_leaf(ctx.sender(), *leaf).timeout(TIMEOUT).map(|res| { - let paras = res.expect("`activate_leaf` timed out").unwrap(); - assert_eq!(paras, vec![PARA_A, PARA_B]); + res.expect("`activate_leaf` timed out").unwrap(); }); let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[leaf_idx..]).await; assert_min_relay_parents_request(&mut ctx_handle, leaf, prospective_response).await; - assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[min_min_idx..]).await; + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[min_min_idx..leaf_idx]) + .await; }; futures::executor::block_on(join(fut, overseer_fut)); @@ -558,6 +722,11 @@ mod tests { allowed_relay_parents.allowed_relay_parents_contiguous, expected_ancestry ); + + assert_eq!(view.known_allowed_relay_parents_under(&leaf, None), Some(&expected_ancestry[..])); + assert_eq!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_A)), Some(&expected_ancestry[..(PARA_A_MIN_PARENT - 1) as usize])); + assert_eq!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_B)), Some(&expected_ancestry[..])); + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_C)).unwrap().is_empty()); } ); @@ -566,18 +735,188 @@ mod tests { let prospective_response = vec![(PARA_C, PARA_C_MIN_PARENT)]; let leaf = CHAIN_A.last().unwrap(); let blocks = [&[GENESIS_HASH], CHAIN_A].concat(); + let leaf_idx = blocks.len() - 1; let fut = view.activate_leaf(ctx.sender(), *leaf).timeout(TIMEOUT).map(|res| { - let paras = res.expect("`activate_leaf` timed out").unwrap(); - assert_eq!(paras, vec![PARA_C]); + res.expect("`activate_leaf` timed out").unwrap(); }); let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, CHAIN_A, &blocks[leaf_idx..]).await; assert_min_relay_parents_request(&mut ctx_handle, leaf, prospective_response).await; - assert_block_header_requests(&mut ctx_handle, CHAIN_A, &blocks).await; + assert_block_header_requests(&mut ctx_handle, CHAIN_A, &blocks[..leaf_idx]).await; + }; + futures::executor::block_on(join(fut, overseer_fut)); + + assert_eq!(view.leaves.len(), 2); + + let leaf_info = + view.block_info_storage.get(leaf).expect("block must be present in storage"); + assert_matches!( + leaf_info.maybe_allowed_relay_parents, + Some(ref allowed_relay_parents) => { + assert_eq!(allowed_relay_parents.minimum_relay_parents[&PARA_C], GENESIS_NUMBER); + let expected_ancestry: Vec = + blocks[..].iter().rev().copied().collect(); + assert_eq!( + allowed_relay_parents.allowed_relay_parents_contiguous, + expected_ancestry + ); + + assert_eq!(view.known_allowed_relay_parents_under(&leaf, None), Some(&expected_ancestry[..])); + assert_eq!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_C)), Some(&expected_ancestry[..])); + + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_A)).unwrap().is_empty()); + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_B)).unwrap().is_empty()); + } + ); + } + + #[test] + fn construct_fresh_view_single_para() { + let pool = TaskExecutor::new(); + let (mut ctx, mut ctx_handle) = make_subsystem_context::(pool); + + let mut view = View::new(Some(PARA_A)); + + assert_eq!(view.collating_for, Some(PARA_A)); + + // Chain B. + const PARA_A_MIN_PARENT: u32 = 4; + + let current_session = 2; + + let leaf = CHAIN_B.last().unwrap(); + let leaf_idx = CHAIN_B.len() - 1; + let min_min_idx = (PARA_A_MIN_PARENT - GENESIS_NUMBER - 1) as usize; + + let fut = view.activate_leaf(ctx.sender(), *leaf).timeout(TIMEOUT).map(|res| { + res.expect("`activate_leaf` timed out").unwrap(); + }); + let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[leaf_idx..]).await; + + assert_async_backing_params_request( + &mut ctx_handle, + *leaf, + AsyncBackingParams { + max_candidate_depth: 0, + allowed_ancestry_len: PARA_A_MIN_PARENT, + }, + ) + .await; + + assert_session_index_request(&mut ctx_handle, *leaf, current_session).await; + + assert_ancestors_request( + &mut ctx_handle, + *leaf, + PARA_A_MIN_PARENT, + CHAIN_B[min_min_idx..leaf_idx].iter().copied().rev().collect(), + ) + .await; + + for hash in CHAIN_B[min_min_idx..leaf_idx].into_iter().rev() { + assert_session_index_request(&mut ctx_handle, *hash, current_session).await; + } + + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[min_min_idx..leaf_idx]) + .await; }; futures::executor::block_on(join(fut, overseer_fut)); + for i in min_min_idx..(CHAIN_B.len() - 1) { + // No allowed relay parents constructed for ancestry. + assert!(view.known_allowed_relay_parents_under(&CHAIN_B[i], None).is_none()); + } + + let leaf_info = + view.block_info_storage.get(leaf).expect("block must be present in storage"); + assert_matches!( + leaf_info.maybe_allowed_relay_parents, + Some(ref allowed_relay_parents) => { + assert_eq!(allowed_relay_parents.minimum_relay_parents[&PARA_A], PARA_A_MIN_PARENT); + let expected_ancestry: Vec = + CHAIN_B[min_min_idx..].iter().rev().copied().collect(); + assert_eq!( + allowed_relay_parents.allowed_relay_parents_contiguous, + expected_ancestry + ); + + assert_eq!(view.known_allowed_relay_parents_under(&leaf, None), Some(&expected_ancestry[..])); + assert_eq!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_A)), Some(&expected_ancestry[..])); + + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_B)).unwrap().is_empty()); + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_C)).unwrap().is_empty()); + } + ); + + // Suppose the whole test chain A is allowed up to genesis for para A, but the genesis block + // is in a different session. + let leaf = CHAIN_A.last().unwrap(); + let blocks = [&[GENESIS_HASH], CHAIN_A].concat(); + let leaf_idx = blocks.len() - 1; + + let fut = view.activate_leaf(ctx.sender(), *leaf).timeout(TIMEOUT).map(|res| { + res.expect("`activate_leaf` timed out").unwrap(); + }); + + let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, CHAIN_A, &blocks[leaf_idx..]).await; + + assert_async_backing_params_request( + &mut ctx_handle, + *leaf, + AsyncBackingParams { + max_candidate_depth: 0, + allowed_ancestry_len: blocks.len() as u32, + }, + ) + .await; + + assert_session_index_request(&mut ctx_handle, *leaf, current_session).await; + + assert_ancestors_request( + &mut ctx_handle, + *leaf, + blocks.len() as u32, + blocks[..leaf_idx].iter().rev().copied().collect(), + ) + .await; + + for hash in blocks[1..leaf_idx].into_iter().rev() { + assert_session_index_request(&mut ctx_handle, *hash, current_session).await; + } + + assert_session_index_request(&mut ctx_handle, GENESIS_HASH, 0).await; + + // We won't request for the genesis block + assert_block_header_requests(&mut ctx_handle, CHAIN_A, &blocks[1..leaf_idx]).await; + }; + + futures::executor::block_on(join(fut, overseer_fut)); + assert_eq!(view.leaves.len(), 2); + + let leaf_info = + view.block_info_storage.get(leaf).expect("block must be present in storage"); + assert_matches!( + leaf_info.maybe_allowed_relay_parents, + Some(ref allowed_relay_parents) => { + assert_eq!(allowed_relay_parents.minimum_relay_parents[&PARA_A], 1); + let expected_ancestry: Vec = + CHAIN_A[..].iter().rev().copied().collect(); + assert_eq!( + allowed_relay_parents.allowed_relay_parents_contiguous, + expected_ancestry + ); + + assert_eq!(view.known_allowed_relay_parents_under(&leaf, None), Some(&expected_ancestry[..])); + assert_eq!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_A)), Some(&expected_ancestry[..])); + + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_B)).unwrap().is_empty()); + assert!(view.known_allowed_relay_parents_under(&leaf, Some(PARA_C)).unwrap().is_empty()); + } + ); } #[test] @@ -595,15 +934,20 @@ mod tests { let prospective_response = vec![(PARA_A, PARA_A_MIN_PARENT)]; let fut = view.activate_leaf(ctx.sender(), leaf_a).timeout(TIMEOUT).map(|res| { - let paras = res.expect("`activate_leaf` timed out").unwrap(); - assert_eq!(paras, vec![PARA_A]); + res.expect("`activate_leaf` timed out").unwrap(); }); let overseer_fut = async { + assert_block_header_requests( + &mut ctx_handle, + CHAIN_B, + &CHAIN_B[(leaf_a_number - 1)..leaf_a_number], + ) + .await; assert_min_relay_parents_request(&mut ctx_handle, &leaf_a, prospective_response).await; assert_block_header_requests( &mut ctx_handle, CHAIN_B, - &CHAIN_B[min_min_idx..leaf_a_number], + &CHAIN_B[min_min_idx..(leaf_a_number - 1)], ) .await; }; @@ -617,15 +961,20 @@ mod tests { let prospective_response = vec![(PARA_B, PARA_B_MIN_PARENT)]; let fut = view.activate_leaf(ctx.sender(), leaf_b).timeout(TIMEOUT).map(|res| { - let paras = res.expect("`activate_leaf` timed out").unwrap(); - assert_eq!(paras, vec![PARA_B]); + res.expect("`activate_leaf` timed out").unwrap(); }); let overseer_fut = async { + assert_block_header_requests( + &mut ctx_handle, + CHAIN_B, + &CHAIN_B[(leaf_b_number - 1)..leaf_b_number], + ) + .await; assert_min_relay_parents_request(&mut ctx_handle, &leaf_b, prospective_response).await; assert_block_header_requests( &mut ctx_handle, CHAIN_B, - &CHAIN_B[leaf_a_number..leaf_b_number], // Note the expected range. + &CHAIN_B[leaf_a_number..(leaf_b_number - 1)], // Note the expected range. ) .await; }; @@ -665,13 +1014,15 @@ mod tests { .timeout(TIMEOUT) .map(|res| res.unwrap().unwrap()); let overseer_fut = async { - assert_min_relay_parents_request(&mut ctx_handle, &leaf_a, prospective_response).await; assert_block_header_requests( &mut ctx_handle, CHAIN_B, - &CHAIN_B[min_a_idx..=leaf_a_idx], + &CHAIN_B[leaf_a_idx..(leaf_a_idx + 1)], ) .await; + assert_min_relay_parents_request(&mut ctx_handle, &leaf_a, prospective_response).await; + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &CHAIN_B[min_a_idx..leaf_a_idx]) + .await; }; futures::executor::block_on(join(fut, overseer_fut)); @@ -689,8 +1040,11 @@ mod tests { .timeout(TIMEOUT) .map(|res| res.expect("`activate_leaf` timed out").unwrap()); let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &blocks[(blocks.len() - 1)..]) + .await; assert_min_relay_parents_request(&mut ctx_handle, &leaf_b, prospective_response).await; - assert_block_header_requests(&mut ctx_handle, CHAIN_B, blocks).await; + assert_block_header_requests(&mut ctx_handle, CHAIN_B, &blocks[..(blocks.len() - 1)]) + .await; }; futures::executor::block_on(join(fut, overseer_fut)); @@ -721,19 +1075,18 @@ mod tests { let prospective_response = vec![(PARA_A, PARA_A_MIN_PARENT)]; let fut = view.activate_leaf(ctx.sender(), GENESIS_HASH).timeout(TIMEOUT).map(|res| { - let paras = res.expect("`activate_leaf` timed out").unwrap(); - assert_eq!(paras, vec![PARA_A]); + res.expect("`activate_leaf` timed out").unwrap(); }); let overseer_fut = async { + assert_block_header_requests(&mut ctx_handle, &[GENESIS_HASH], &[GENESIS_HASH]).await; assert_min_relay_parents_request(&mut ctx_handle, &GENESIS_HASH, prospective_response) .await; - assert_block_header_requests(&mut ctx_handle, &[GENESIS_HASH], &[GENESIS_HASH]).await; }; futures::executor::block_on(join(fut, overseer_fut)); assert_matches!( view.known_allowed_relay_parents_under(&GENESIS_HASH, None), - Some(hashes) if !hashes.is_empty() + Some(hashes) if hashes == &[GENESIS_HASH] ); } } diff --git a/prdoc/pr_4471.prdoc b/prdoc/pr_4471.prdoc new file mode 100644 index 0000000000000..6d589be81fd96 --- /dev/null +++ b/prdoc/pr_4471.prdoc @@ -0,0 +1,16 @@ +title: "Remove prospective-parachains subsystem from collator nodes" + +doc: + - audience: Node Dev + description: | + Removes the prospective-parachains subsystem from collators. The GetMinimumRelayParents of the implicit view + is replaced by direct ChainAPI and runtime calls. The subsystem was causing performance problems when collating + connected to an RPC node, due to the high number of runtime API calls, which were unneccessary for a collator. + +crates: + - name: polkadot-collator-protocol + bump: minor + - name: polkadot-service + bump: minor + - name: polkadot-node-subsystem-util + bump: minor