Skip to content

Commit

Permalink
Remove the prospective-parachains subsystem from collators (paritytec…
Browse files Browse the repository at this point in the history
…h#4471)

Implements paritytech#4429

Collators only need to maintain the implicit view for the paraid they
are collating on.
In this case, bypass prospective-parachains entirely. It's still useful
to use the GetMinimumRelayParents message from prospective-parachains
for validators, because the data is already present there.

This enables us to entirely remove the subsystem from collators, which
consumed resources needlessly

Aims to resolve paritytech#4167 

TODO:
- [x] fix unit tests
  • Loading branch information
alindima authored and hitchhooker committed Jun 5, 2024
1 parent bb28733 commit 1892aa1
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 129 deletions.
21 changes: 12 additions & 9 deletions polkadot/node/core/backing/src/tests/prospective_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@ async fn activate_leaf(
.min()
.unwrap_or(&leaf_number);

assert_matches!(
virtual_overseer.recv().await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx)
) if parent == leaf_hash => {
tx.send(min_relay_parents).unwrap();
}
);

let ancestry_len = leaf_number + 1 - min_min;

let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h)))
Expand Down Expand Up @@ -117,6 +108,18 @@ async fn activate_leaf(
tx.send(Ok(Some(header))).unwrap();
}
);

if requested_len == 0 {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx)
) if parent == leaf_hash => {
tx.send(min_relay_parents.clone()).unwrap();
}
);
}

requested_len += 1;
}
}
Expand Down
58 changes: 35 additions & 23 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ struct State {
/// never included in the fragment chains of active leaves which do. In
/// particular, this means that if a given relay parent belongs to implicit
/// ancestry of some active leaf, then it does support prospective parachains.
implicit_view: ImplicitView,
///
/// It's `None` if the collator is not yet collating for a paraid.
implicit_view: Option<ImplicitView>,

/// All active leaves observed by us, including both that do and do not
/// support prospective parachains. This mapping works as a replacement for
Expand Down Expand Up @@ -334,7 +336,7 @@ impl State {
metrics,
collating_on: Default::default(),
peer_data: Default::default(),
implicit_view: Default::default(),
implicit_view: None,
active_leaves: Default::default(),
per_relay_parent: Default::default(),
span_per_relay_parent: Default::default(),
Expand Down Expand Up @@ -539,11 +541,12 @@ async fn distribute_collation<Context>(
.filter(|(_, PeerData { view: v, .. })| match relay_parent_mode {
ProspectiveParachainsMode::Disabled => v.contains(&candidate_relay_parent),
ProspectiveParachainsMode::Enabled { .. } => v.iter().any(|block_hash| {
state
.implicit_view
.known_allowed_relay_parents_under(block_hash, Some(id))
.unwrap_or_default()
.contains(&candidate_relay_parent)
state.implicit_view.as_ref().map(|implicit_view| {
implicit_view
.known_allowed_relay_parents_under(block_hash, Some(id))
.unwrap_or_default()
.contains(&candidate_relay_parent)
}) == Some(true)
}),
});

Expand Down Expand Up @@ -830,6 +833,7 @@ async fn process_msg<Context>(
match msg {
CollateOn(id) => {
state.collating_on = Some(id);
state.implicit_view = Some(ImplicitView::new(Some(id)));
},
DistributeCollation {
candidate_receipt,
Expand Down Expand Up @@ -1215,7 +1219,10 @@ async fn handle_peer_view_change<Context>(
Some(ProspectiveParachainsMode::Disabled) => std::slice::from_ref(&added),
Some(ProspectiveParachainsMode::Enabled { .. }) => state
.implicit_view
.known_allowed_relay_parents_under(&added, state.collating_on)
.as_ref()
.and_then(|implicit_view| {
implicit_view.known_allowed_relay_parents_under(&added, state.collating_on)
})
.unwrap_or_default(),
None => {
gum::trace!(
Expand Down Expand Up @@ -1353,21 +1360,22 @@ where
state.per_relay_parent.insert(*leaf, PerRelayParent::new(mode));

if mode.is_enabled() {
state
.implicit_view
.activate_leaf(sender, *leaf)
.await
.map_err(Error::ImplicitViewFetchError)?;
if let Some(ref mut implicit_view) = state.implicit_view {
implicit_view
.activate_leaf(sender, *leaf)
.await
.map_err(Error::ImplicitViewFetchError)?;

let allowed_ancestry = state
.implicit_view
.known_allowed_relay_parents_under(leaf, state.collating_on)
.unwrap_or_default();
for block_hash in allowed_ancestry {
state
.per_relay_parent
.entry(*block_hash)
.or_insert_with(|| PerRelayParent::new(mode));
let allowed_ancestry = implicit_view
.known_allowed_relay_parents_under(leaf, state.collating_on)
.unwrap_or_default();

for block_hash in allowed_ancestry {
state
.per_relay_parent
.entry(*block_hash)
.or_insert_with(|| PerRelayParent::new(mode));
}
}
}
}
Expand All @@ -1378,7 +1386,11 @@ where
// of implicit ancestry. Only update the state after the hash is actually
// pruned from the block info storage.
let pruned = if mode.is_enabled() {
state.implicit_view.deactivate_leaf(*leaf)
state
.implicit_view
.as_mut()
.map(|view| view.deactivate_leaf(*leaf))
.unwrap_or_default()
} else {
vec![*leaf]
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use super::*;

use polkadot_node_subsystem::messages::{ChainApiMessage, ProspectiveParachainsMessage};
use polkadot_node_subsystem::messages::ChainApiMessage;
use polkadot_primitives::{AsyncBackingParams, Header, OccupiedCore};

const ASYNC_BACKING_PARAMETERS: AsyncBackingParams =
Expand All @@ -31,7 +31,6 @@ fn get_parent_hash(hash: Hash) -> Hash {
/// Handle a view update.
async fn update_view(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
new_view: Vec<(Hash, u32)>, // Hash and block number.
activated: u8, // How many new heads does this update contain?
) {
Expand Down Expand Up @@ -61,21 +60,88 @@ async fn update_view(

let min_number = leaf_number.saturating_sub(ASYNC_BACKING_PARAMETERS.allowed_ancestry_len);

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx),
) if parent == leaf_hash => {
tx.send(vec![(test_state.para_id, min_number)]).unwrap();
}
);

let ancestry_len = leaf_number + 1 - min_number;
let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h)))
.take(ancestry_len as usize);
let ancestry_numbers = (min_number..=leaf_number).rev();
let mut ancestry_iter = ancestry_hashes.clone().zip(ancestry_numbers).peekable();

if let Some((hash, number)) = ancestry_iter.next() {
assert_matches!(
overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(),
AllMessages::ChainApi(ChainApiMessage::BlockHeader(.., tx)) => {
let header = Header {
parent_hash: get_parent_hash(hash),
number,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
};

tx.send(Ok(Some(header))).unwrap();
}
);

assert_matches!(
overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(),
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
..,
RuntimeApiRequest::AsyncBackingParams(
tx
)
)
) => {
tx.send(Ok(ASYNC_BACKING_PARAMETERS)).unwrap();
}
);

assert_matches!(
overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(),
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
..,
RuntimeApiRequest::SessionIndexForChild(
tx
)
)
) => {
tx.send(Ok(1)).unwrap();
}
);

assert_matches!(
overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(),
AllMessages::ChainApi(
ChainApiMessage::Ancestors {
k,
response_channel: tx,
..
}
) => {
assert_eq!(k, ASYNC_BACKING_PARAMETERS.allowed_ancestry_len as usize);

tx.send(Ok(ancestry_hashes.clone().skip(1).into_iter().collect())).unwrap();
}
);
}

for _ in ancestry_iter.clone() {
assert_matches!(
overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(),
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
..,
RuntimeApiRequest::SessionIndexForChild(
tx
)
)
) => {
tx.send(Ok(1)).unwrap();
}
);
}

while let Some((hash, number)) = ancestry_iter.next() {
// May be `None` for the last element.
let parent_hash =
Expand Down Expand Up @@ -195,7 +261,7 @@ fn distribute_collation_from_implicit_view() {
overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id))
.await;
// Activated leaf is `b`, but the collation will be based on `c`.
update_view(virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;
update_view(virtual_overseer, vec![(head_b, head_b_num)], 1).await;

let validator_peer_ids = test_state.current_group_validator_peer_ids();
for (val, peer) in test_state
Expand Down Expand Up @@ -258,7 +324,7 @@ fn distribute_collation_from_implicit_view() {

// Head `c` goes out of view.
// Build a different candidate for this relay parent and attempt to distribute it.
update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await;
update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await;

let pov = PoV { block_data: BlockData(vec![4, 5, 6]) };
let parent_head_data_hash = Hash::repeat_byte(0xBB);
Expand Down Expand Up @@ -318,7 +384,7 @@ fn distribute_collation_up_to_limit() {
overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id))
.await;
// Activated leaf is `a`, but the collation will be based on `b`.
update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await;
update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await;

for i in 0..(ASYNC_BACKING_PARAMETERS.max_candidate_depth + 1) {
let pov = PoV { block_data: BlockData(vec![i as u8]) };
Expand Down Expand Up @@ -402,7 +468,7 @@ fn send_parent_head_data_for_elastic_scaling() {
CollatorProtocolMessage::CollateOn(test_state.para_id),
)
.await;
update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;
update_view(&mut virtual_overseer, vec![(head_b, head_b_num)], 1).await;

let pov_data = PoV { block_data: BlockData(vec![1 as u8]) };
let candidate = TestCandidateBuilder {
Expand Down Expand Up @@ -517,8 +583,8 @@ fn advertise_and_send_collation_by_hash() {
CollatorProtocolMessage::CollateOn(test_state.para_id),
)
.await;
update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;
update_view(&mut virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await;
update_view(&mut virtual_overseer, vec![(head_b, head_b_num)], 1).await;
update_view(&mut virtual_overseer, vec![(head_a, head_a_num)], 1).await;

let candidates: Vec<_> = (0..2)
.map(|i| {
Expand Down Expand Up @@ -638,7 +704,7 @@ fn advertise_core_occupied() {
overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id))
.await;
// Activated leaf is `a`, but the collation will be based on `b`.
update_view(virtual_overseer, &test_state, vec![(head_a, head_a_num)], 1).await;
update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await;

let pov = PoV { block_data: BlockData(vec![1, 2, 3]) };
let candidate = TestCandidateBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,6 @@ pub(super) async fn update_view(

let min_number = leaf_number.saturating_sub(ASYNC_BACKING_PARAMETERS.allowed_ancestry_len);

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx),
) if parent == leaf_hash => {
tx.send(test_state.chain_ids.iter().map(|para_id| (*para_id, min_number)).collect()).unwrap();
}
);

let ancestry_len = leaf_number + 1 - min_number;
let ancestry_hashes = std::iter::successors(Some(leaf_hash), |h| Some(get_parent_hash(*h)))
.take(ancestry_len as usize);
Expand Down Expand Up @@ -166,6 +157,17 @@ pub(super) async fn update_view(
}
);

if requested_len == 0 {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx),
) if parent == leaf_hash => {
tx.send(test_state.chain_ids.iter().map(|para_id| (*para_id, min_number)).collect()).unwrap();
}
);
}

requested_len += 1;
}
}
Expand Down
26 changes: 13 additions & 13 deletions polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,19 +586,6 @@ async fn handle_leaf_activation(
}
);

let mrp_response: Vec<(ParaId, BlockNumber)> = para_data
.iter()
.map(|(para_id, data)| (*para_id, data.min_relay_parent))
.collect();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx)
) if parent == *hash => {
tx.send(mrp_response).unwrap();
}
);

let header = Header {
parent_hash: *parent_hash,
number: *number,
Expand All @@ -615,6 +602,19 @@ async fn handle_leaf_activation(
}
);

let mrp_response: Vec<(ParaId, BlockNumber)> = para_data
.iter()
.map(|(para_id, data)| (*para_id, data.min_relay_parent))
.collect();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetMinimumRelayParents(parent, tx)
) if parent == *hash => {
tx.send(mrp_response).unwrap();
}
);

loop {
match virtual_overseer.recv().await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand Down
Loading

0 comments on commit 1892aa1

Please sign in to comment.