Skip to content

Commit

Permalink
statement-distribution: fix filtering of statements for elastic parac…
Browse files Browse the repository at this point in the history
…hains (paritytech#3879)

fixes paritytech#3775

Additionally moves the claim queue fetch utilities into
`subsystem-util`.

TODO:
- [x] fix tests
- [x] add elastic scaling tests

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim authored and dharjeezy committed Apr 9, 2024
1 parent 1454fde commit 0663172
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 98 deletions.
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
Expand Down
54 changes: 10 additions & 44 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
request_async_backing_params, request_availability_cores, request_para_backing_state,
request_persisted_validation_data, request_validation_code, request_validation_code_hash,
request_validators,
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use std::sync::Arc;

mod error;

Expand Down Expand Up @@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
Expand Down Expand Up @@ -655,37 +655,3 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
15 changes: 9 additions & 6 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ use polkadot_node_subsystem::{
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt};
use polkadot_primitives::{
async_backing::{BackingState, CandidatePendingAvailability},
AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};
use test_helpers::{
dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
};
Expand Down Expand Up @@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
let res = ClaimQueueSnapshot::new();
tx.send(Ok(res)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand Down Expand Up @@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -962,7 +965,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -1050,7 +1053,7 @@ mod helpers {
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
claim_queue: ClaimQueueSnapshot,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,

Expand Down
131 changes: 95 additions & 36 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::fetch_claim_queue,
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}

impl PerRelayParentState {
Expand Down Expand Up @@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
activated: &ActivatedLeaf,
leaf_mode: ProspectiveParachainsMode,
) -> JfyiErrorResult<()> {
let seconding_limit = match leaf_mode {
let max_candidate_depth = match leaf_mode {
ProspectiveParachainsMode::Disabled => return Ok(()),
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
};

let seconding_limit = max_candidate_depth + 1;

state
.implicit_view
.activate_leaf(ctx.sender(), activated.hash)
Expand Down Expand Up @@ -693,15 +695,23 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});

let groups_per_para = determine_groups_per_para(
ctx.sender(),
new_relay_parent,
availability_cores,
group_rotation_info,
max_candidate_depth,
)
.await;

state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
Expand Down Expand Up @@ -2126,17 +2136,64 @@ async fn provide_candidate_to_grid<Context>(
}
}

fn group_for_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
async fn determine_groups_per_para(
sender: &mut impl overseer::StatementDistributionSenderTrait,
relay_parent: Hash,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let maybe_claim_queue = fetch_claim_queue(sender, relay_parent)
.await
.unwrap_or_else(|err| {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?err,
"determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores"
);
None
});

let n_cores = availability_cores.len();

// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue
.into_iter()
.filter_map(|(core_index, paras)| Some((*paras.front()?, core_index)))
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core
.next_up_on_available
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
},
CoreState::Free => None,
})
.collect()
};

core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

groups_per_para
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
Expand Down Expand Up @@ -2192,18 +2249,23 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);

let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
let group_index = confirmed.group_index();

// Sanity check if group_index is valid for this para at relay parent.
let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor().para_id)
else {
continue
};
if !expected_groups.iter().any(|g| *g == group_index) {
continue
}

if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
Expand Down Expand Up @@ -2311,13 +2373,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};

let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
};

if expected_group != Some(manifest_summary.claimed_group_index) {
if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
Expand Down Expand Up @@ -3037,13 +3098,11 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
return false
};

Some(g_index) == expected_group
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
Loading

0 comments on commit 0663172

Please sign in to comment.