From 60e537b95f2336dc598cb9ce49823316841ee3dd Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Thu, 22 Feb 2024 15:22:31 +0700 Subject: [PATCH] Elastic scaling: use an assumed `CoreIndex` in `candidate-backing` (#3229) First step in implementing https://github.com/paritytech/polkadot-sdk/issues/3144 ### Summary of changes - switch statement `Table` candidate mapping from `ParaId` to `CoreIndex` - introduce experimental `InjectCoreIndex` node feature. - determine and assume a `CoreIndex` for a candidate based on statement validator index. If the signature is valid it means validator controls the validator that index and we can easily map it to a validator group/core. - introduce a temporary provisioner fix until we fully enable elastic scaling in the subystem. The fix ensures we don't fetch the same backable candidate when calling `get_backable_candidate` for each core. TODO: - [x] fix backing tests - [x] fix statement table tests - [x] add new test --------- Signed-off-by: Andrei Sandu Signed-off-by: alindima Co-authored-by: alindima --- Cargo.lock | 3 + polkadot/node/core/backing/Cargo.toml | 1 + polkadot/node/core/backing/src/error.rs | 3 + polkadot/node/core/backing/src/lib.rs | 285 ++++++++++++++---- polkadot/node/core/backing/src/tests/mod.rs | 128 +++++++- .../src/tests/prospective_parachains.rs | 34 ++- polkadot/node/core/provisioner/src/lib.rs | 12 +- polkadot/primitives/Cargo.toml | 2 + polkadot/primitives/src/v6/mod.rs | 27 +- polkadot/primitives/src/vstaging/mod.rs | 6 +- polkadot/statement-table/Cargo.toml | 1 + polkadot/statement-table/src/generic.rs | 50 ++- polkadot/statement-table/src/lib.rs | 6 +- 13 files changed, 462 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93ee9cc99c44..950b4e0889a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12490,6 +12490,7 @@ dependencies = [ "polkadot-primitives-test-helpers", "polkadot-statement-table", "sc-keystore", + "schnellru", "sp-application-crypto", "sp-core", "sp-keyring", @@ -13159,6 +13160,7 @@ version = "7.0.0" dependencies = [ "bitvec", "hex-literal", + "log", "parity-scale-codec", "polkadot-core-primitives", "polkadot-parachain-primitives", @@ -13556,6 +13558,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "sp-core", + "tracing-gum", ] [[package]] diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index b0cf041e38da..f71b8df80dd2 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -22,6 +22,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } gum = { package = "tracing-gum", path = "../../gum" } thiserror = { workspace = true } fatality = "0.0.6" +schnellru = "0.2.1" [dev-dependencies] sp-core = { path = "../../../../substrate/primitives/core" } diff --git a/polkadot/node/core/backing/src/error.rs b/polkadot/node/core/backing/src/error.rs index 1b00a62510b7..64955a393962 100644 --- a/polkadot/node/core/backing/src/error.rs +++ b/polkadot/node/core/backing/src/error.rs @@ -48,6 +48,9 @@ pub enum Error { #[error("Candidate is not found")] CandidateNotFound, + #[error("CoreIndex cannot be determined for a candidate")] + CoreIndexUnavailable, + #[error("Signature is invalid")] InvalidSignature, diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 98bbd6232add..cc192607cea0 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -70,13 +70,14 @@ use std::{ sync::Arc, }; -use bitvec::vec::BitVec; +use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; use futures::{ channel::{mpsc, oneshot}, future::BoxFuture, stream::FuturesOrdered, FutureExt, SinkExt, StreamExt, TryFutureExt, }; +use schnellru::{ByLength, LruMap}; use error::{Error, FatalResult}; use polkadot_node_primitives::{ @@ -104,10 +105,12 @@ use polkadot_node_subsystem_util::{ Validator, }; use polkadot_primitives::{ + vstaging::{node_features::FeatureIndex, NodeFeatures}, BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, - CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, Hash, Id as ParaId, - PersistedValidationData, PvfExecKind, SigningContext, ValidationCode, ValidatorId, - ValidatorIndex, ValidatorSignature, ValidityAttestation, + CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, GroupIndex, GroupRotationInfo, + Hash, Id as ParaId, IndexedVec, PersistedValidationData, PvfExecKind, SessionIndex, + SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature, + ValidityAttestation, }; use sp_keystore::KeystorePtr; use statement_table::{ @@ -118,7 +121,7 @@ use statement_table::{ }, Config as TableConfig, Context as TableContextTrait, Table, }; -use util::vstaging::get_disabled_validators_with_fallback; +use util::{runtime::request_node_features, vstaging::get_disabled_validators_with_fallback}; mod error; @@ -209,7 +212,9 @@ struct PerRelayParentState { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, /// The `ParaId` assigned to the local validator at this relay parent. - assignment: Option, + assigned_para: Option, + /// The `CoreIndex` assigned to the local validator at this relay parent. + assigned_core: Option, /// The candidates that are backed by enough validators in their group, by hash. backed: HashSet, /// The table of candidates and statements under this relay-parent. @@ -224,6 +229,15 @@ struct PerRelayParentState { fallbacks: HashMap, /// The minimum backing votes threshold. minimum_backing_votes: u32, + /// If true, we're appending extra bits in the BackedCandidate validator indices bitfield, + /// which represent the assigned core index. True if ElasticScalingMVP is enabled. + inject_core_index: bool, + /// The core states for all cores. + cores: Vec, + /// The validator index -> group mapping at this relay parent. + validator_to_group: Arc>>, + /// The associated group rotation information. + group_rotation_info: GroupRotationInfo, } struct PerCandidateState { @@ -275,6 +289,9 @@ struct State { /// This is guaranteed to have an entry for each candidate with a relay parent in the implicit /// or explicit view for which a `Seconded` statement has been successfully imported. per_candidate: HashMap, + /// Cache the per-session Validator->Group mapping. + validator_to_group_cache: + LruMap>>>, /// A cloneable sender which is dispatched to background candidate validation tasks to inform /// the main task of the result. background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>, @@ -292,6 +309,7 @@ impl State { per_leaf: HashMap::default(), per_relay_parent: HashMap::default(), per_candidate: HashMap::new(), + validator_to_group_cache: LruMap::new(ByLength::new(2)), background_validation_tx, keystore, } @@ -379,10 +397,10 @@ struct AttestingData { backing: Vec, } -#[derive(Default)] +#[derive(Default, Debug)] struct TableContext { validator: Option, - groups: HashMap>, + groups: HashMap>, validators: Vec, disabled_validators: Vec, } @@ -404,7 +422,7 @@ impl TableContext { impl TableContextTrait for TableContext { type AuthorityId = ValidatorIndex; type Digest = CandidateHash; - type GroupId = ParaId; + type GroupId = CoreIndex; type Signature = ValidatorSignature; type Candidate = CommittedCandidateReceipt; @@ -412,15 +430,11 @@ impl TableContextTrait for TableContext { candidate.hash() } - fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId { - candidate.descriptor().para_id + fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool { + self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority)) } - fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool { - self.groups.get(group).map_or(false, |g| g.iter().any(|a| a == authority)) - } - - fn get_group_size(&self, group: &ParaId) -> Option { + fn get_group_size(&self, group: &CoreIndex) -> Option { self.groups.get(group).map(|g| g.len()) } } @@ -442,19 +456,20 @@ fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedSt fn table_attested_to_backed( attested: TableAttestedCandidate< - ParaId, + CoreIndex, CommittedCandidateReceipt, ValidatorIndex, ValidatorSignature, >, table_context: &TableContext, + inject_core_index: bool, ) -> Option { - let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested; + let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested; let (ids, validity_votes): (Vec<_>, Vec) = validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip(); - let group = table_context.groups.get(¶_id)?; + let group = table_context.groups.get(&core_index)?; let mut validator_indices = BitVec::with_capacity(group.len()); @@ -479,6 +494,12 @@ fn table_attested_to_backed( } vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group); + if inject_core_index { + let core_index_to_inject: BitVec = + BitVec::from_vec(vec![core_index.0 as u8]); + validator_indices.extend(core_index_to_inject); + } + Some(BackedCandidate { candidate, validity_votes: vote_positions @@ -971,7 +992,14 @@ async fn handle_active_leaves_update( // construct a `PerRelayParent` from the runtime API // and insert it. - let per = construct_per_relay_parent_state(ctx, maybe_new, &state.keystore, mode).await?; + let per = construct_per_relay_parent_state( + ctx, + maybe_new, + &state.keystore, + &mut state.validator_to_group_cache, + mode, + ) + .await?; if let Some(per) = per { state.per_relay_parent.insert(maybe_new, per); @@ -981,31 +1009,112 @@ async fn handle_active_leaves_update( Ok(()) } +macro_rules! try_runtime_api { + ($x: expr) => { + match $x { + Ok(x) => x, + Err(err) => { + // Only bubble up fatal errors. + error::log_error(Err(Into::::into(err).into()))?; + + // We can't do candidate validation work if we don't have the + // requisite runtime API data. But these errors should not take + // down the node. + return Ok(None) + }, + } + }; +} + +fn core_index_from_statement( + validator_to_group: &IndexedVec>, + group_rotation_info: &GroupRotationInfo, + cores: &[CoreState], + statement: &SignedFullStatementWithPVD, +) -> Option { + let compact_statement = statement.as_unchecked(); + let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash()); + + let n_cores = cores.len(); + + gum::trace!( + target:LOG_TARGET, + ?group_rotation_info, + ?statement, + ?validator_to_group, + n_cores = ?cores.len(), + ?candidate_hash, + "Extracting core index from statement" + ); + + let statement_validator_index = statement.validator_index(); + let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else { + gum::debug!( + target: LOG_TARGET, + ?group_rotation_info, + ?statement, + ?validator_to_group, + n_cores = ?cores.len() , + ?candidate_hash, + "Invalid validator index: {:?}", + statement_validator_index + ); + return None + }; + + // First check if the statement para id matches the core assignment. + let core_index = group_rotation_info.core_for_group(*group_index, n_cores); + + if core_index.0 as usize > n_cores { + gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex"); + return None + } + + if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() { + let candidate_para_id = candidate.descriptor.para_id; + let assigned_para_id = match &cores[core_index.0 as usize] { + CoreState::Free => { + gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id"); + return None + }, + CoreState::Occupied(occupied) => + if let Some(next) = &occupied.next_up_on_available { + next.para_id + } else { + return None + }, + CoreState::Scheduled(scheduled) => scheduled.para_id, + }; + + if assigned_para_id != candidate_para_id { + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + ?core_index, + ?assigned_para_id, + ?candidate_para_id, + "Invalid CoreIndex, core is assigned to a different para_id" + ); + return None + } + return Some(core_index) + } else { + return Some(core_index) + } +} + /// Load the data necessary to do backing work on top of a relay-parent. #[overseer::contextbounds(CandidateBacking, prefix = self::overseer)] async fn construct_per_relay_parent_state( ctx: &mut Context, relay_parent: Hash, keystore: &KeystorePtr, + validator_to_group_cache: &mut LruMap< + SessionIndex, + Arc>>, + >, mode: ProspectiveParachainsMode, ) -> Result, Error> { - macro_rules! try_runtime_api { - ($x: expr) => { - match $x { - Ok(x) => x, - Err(err) => { - // Only bubble up fatal errors. - error::log_error(Err(Into::::into(err).into()))?; - - // We can't do candidate validation work if we don't have the - // requisite runtime API data. But these errors should not take - // down the node. - return Ok(None) - }, - } - }; - } - let parent = relay_parent; let (session_index, validators, groups, cores) = futures::try_join!( @@ -1020,6 +1129,16 @@ async fn construct_per_relay_parent_state( .map_err(Error::JoinMultiple)?; let session_index = try_runtime_api!(session_index); + + let inject_core_index = request_node_features(parent, session_index, ctx.sender()) + .await? + .unwrap_or(NodeFeatures::EMPTY) + .get(FeatureIndex::ElasticScalingMVP as usize) + .map(|b| *b) + .unwrap_or(false); + + gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state"); + let validators: Vec<_> = try_runtime_api!(validators); let (validator_groups, group_rotation_info) = try_runtime_api!(groups); let cores = try_runtime_api!(cores); @@ -1055,18 +1174,24 @@ async fn construct_per_relay_parent_state( }, }; - let mut groups = HashMap::new(); let n_cores = cores.len(); - let mut assignment = None; - for (idx, core) in cores.into_iter().enumerate() { + let mut groups = HashMap::>::new(); + let mut assigned_core = None; + let mut assigned_para = None; + + for (idx, core) in cores.iter().enumerate() { let core_para_id = match core { CoreState::Scheduled(scheduled) => scheduled.para_id, CoreState::Occupied(occupied) => if mode.is_enabled() { // Async backing makes it legal to build on top of // occupied core. - occupied.candidate_descriptor.para_id + if let Some(next) = &occupied.next_up_on_available { + next.para_id + } else { + continue + } } else { continue }, @@ -1077,11 +1202,27 @@ async fn construct_per_relay_parent_state( let group_index = group_rotation_info.group_for_core(core_index, n_cores); if let Some(g) = validator_groups.get(group_index.0 as usize) { if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { - assignment = Some(core_para_id); + assigned_para = Some(core_para_id); + assigned_core = Some(core_index); } - groups.insert(core_para_id, g.clone()); + groups.insert(core_index, g.clone()); } } + gum::debug!(target: LOG_TARGET, ?groups, "TableContext"); + + let validator_to_group = validator_to_group_cache + .get_or_insert(session_index, || { + let mut vector = vec![None; validators.len()]; + + for (group_idx, validator_group) in validator_groups.iter().enumerate() { + for validator in validator_group { + vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32)); + } + } + + Arc::new(IndexedVec::<_, _>::from(vector)) + }) + .expect("Just inserted"); let table_context = TableContext { validator, groups, validators, disabled_validators }; let table_config = TableConfig { @@ -1094,7 +1235,8 @@ async fn construct_per_relay_parent_state( Ok(Some(PerRelayParentState { prospective_parachains_mode: mode, parent, - assignment, + assigned_core, + assigned_para, backed: HashSet::new(), table: Table::new(table_config), table_context, @@ -1102,6 +1244,10 @@ async fn construct_per_relay_parent_state( awaiting_validation: HashSet::new(), fallbacks: HashMap::new(), minimum_backing_votes, + inject_core_index, + cores, + validator_to_group: validator_to_group.clone(), + group_rotation_info, })) } @@ -1519,15 +1665,16 @@ async fn import_statement( per_candidate: &mut HashMap, statement: &SignedFullStatementWithPVD, ) -> Result, Error> { + let candidate_hash = statement.payload().candidate_hash(); + gum::debug!( target: LOG_TARGET, statement = ?statement.payload().to_compact(), validator_index = statement.validator_index().0, + ?candidate_hash, "Importing statement", ); - let candidate_hash = statement.payload().candidate_hash(); - // If this is a new candidate (statement is 'seconded' and candidate is unknown), // we need to create an entry in the `PerCandidateState` map. // @@ -1593,7 +1740,15 @@ async fn import_statement( let stmt = primitive_statement_to_table(statement); - Ok(rp_state.table.import_statement(&rp_state.table_context, stmt)) + let core = core_index_from_statement( + &rp_state.validator_to_group, + &rp_state.group_rotation_info, + &rp_state.cores, + statement, + ) + .ok_or(Error::CoreIndexUnavailable)?; + + Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt)) } /// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and @@ -1615,7 +1770,11 @@ async fn post_import_statement_actions( // `HashSet::insert` returns true if the thing wasn't in there already. if rp_state.backed.insert(candidate_hash) { - if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) { + if let Some(backed) = table_attested_to_backed( + attested, + &rp_state.table_context, + rp_state.inject_core_index, + ) { let para_id = backed.candidate.descriptor.para_id; gum::debug!( target: LOG_TARGET, @@ -1654,8 +1813,14 @@ async fn post_import_statement_actions( ); ctx.send_unbounded_message(message); } + } else { + gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate"); } + } else { + gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known"); } + } else { + gum::debug!(target: LOG_TARGET, "No attested candidate"); } issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table); @@ -1859,9 +2024,10 @@ async fn maybe_validate_and_import( let candidate_hash = summary.candidate; - if Some(summary.group_id) != rp_state.assignment { + if Some(summary.group_id) != rp_state.assigned_core { return Ok(()) } + let attesting = match statement.payload() { StatementWithPVD::Seconded(receipt, _) => { let attesting = AttestingData { @@ -2004,10 +2170,11 @@ async fn handle_second_message( } // Sanity check that candidate is from our assignment. - if Some(candidate.descriptor().para_id) != rp_state.assignment { + if Some(candidate.descriptor().para_id) != rp_state.assigned_para { gum::debug!( target: LOG_TARGET, - our_assignment = ?rp_state.assignment, + our_assignment_core = ?rp_state.assigned_core, + our_assignment_para = ?rp_state.assigned_para, collation = ?candidate.descriptor().para_id, "Subsystem asked to second for para outside of our assignment", ); @@ -2015,6 +2182,14 @@ async fn handle_second_message( return Ok(()) } + gum::debug!( + target: LOG_TARGET, + our_assignment_core = ?rp_state.assigned_core, + our_assignment_para = ?rp_state.assigned_para, + collation = ?candidate.descriptor().para_id, + "Current assignments vs collation", + ); + // If the message is a `CandidateBackingMessage::Second`, sign and dispatch a // Seconded statement only if we have not signed a Valid statement for the requested candidate. // @@ -2087,7 +2262,13 @@ fn handle_get_backed_candidates_message( &rp_state.table_context, rp_state.minimum_backing_votes, ) - .and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context)) + .and_then(|attested| { + table_attested_to_backed( + attested, + &rp_state.table_context, + rp_state.inject_core_index, + ) + }) }) .collect(); diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index 1957f4e19c54..7223f1e1dfb0 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -65,13 +65,14 @@ fn dummy_pvd() -> PersistedValidationData { } } -struct TestState { +pub(crate) struct TestState { chain_ids: Vec, keystore: KeystorePtr, validators: Vec, validator_public: Vec, validation_data: PersistedValidationData, validator_groups: (Vec>, GroupRotationInfo), + validator_to_group: IndexedVec>, availability_cores: Vec, head_data: HashMap, signing_context: SigningContext, @@ -114,6 +115,11 @@ impl Default for TestState { .into_iter() .map(|g| g.into_iter().map(ValidatorIndex).collect()) .collect(); + let validator_to_group: IndexedVec<_, _> = + vec![Some(0), Some(1), Some(0), Some(0), None, Some(0)] + .into_iter() + .map(|x| x.map(|x| GroupIndex(x))) + .collect(); let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 }; @@ -143,6 +149,7 @@ impl Default for TestState { validators, validator_public, validator_groups: (validator_groups, group_rotation_info), + validator_to_group, availability_cores, head_data, validation_data, @@ -285,6 +292,16 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS } ); + // Node features request from runtime: all features are disabled. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_parent, RuntimeApiRequest::NodeFeatures(_session_index, tx)) + ) => { + tx.send(Ok(Default::default())).unwrap(); + } + ); + // Check if subsystem job issues a request for the minimum backing votes. assert_matches!( virtual_overseer.recv().await, @@ -639,6 +656,107 @@ fn backing_works() { }); } +#[test] +fn extract_core_index_from_statement_works() { + let test_state = TestState::default(); + + let pov_a = PoV { block_data: BlockData(vec![42, 43, 44]) }; + let pvd_a = dummy_pvd(); + let validation_code_a = ValidationCode(vec![1, 2, 3]); + + let pov_hash = pov_a.hash(); + + let mut candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + erasure_root: make_erasure_root(&test_state, pov_a.clone(), pvd_a.clone()), + persisted_validation_data_hash: pvd_a.hash(), + validation_code: validation_code_a.0.clone(), + ..Default::default() + } + .build(); + + let public2 = Keystore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[2].to_seed()), + ) + .expect("Insert key into keystore"); + + let signed_statement_1 = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate.clone(), pvd_a.clone()), + &test_state.signing_context, + ValidatorIndex(2), + &public2.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + let public1 = Keystore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[1].to_seed()), + ) + .expect("Insert key into keystore"); + + let signed_statement_2 = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate.clone(), pvd_a.clone()), + &test_state.signing_context, + ValidatorIndex(1), + &public1.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + candidate.descriptor.para_id = test_state.chain_ids[1]; + + let signed_statement_3 = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate, pvd_a.clone()), + &test_state.signing_context, + ValidatorIndex(1), + &public1.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + let core_index_1 = core_index_from_statement( + &test_state.validator_to_group, + &test_state.validator_groups.1, + &test_state.availability_cores, + &signed_statement_1, + ) + .unwrap(); + + assert_eq!(core_index_1, CoreIndex(0)); + + let core_index_2 = core_index_from_statement( + &test_state.validator_to_group, + &test_state.validator_groups.1, + &test_state.availability_cores, + &signed_statement_2, + ); + + // Must be none, para_id in descriptor is different than para assigned to core + assert_eq!(core_index_2, None); + + let core_index_3 = core_index_from_statement( + &test_state.validator_to_group, + &test_state.validator_groups.1, + &test_state.availability_cores, + &signed_statement_3, + ) + .unwrap(); + + assert_eq!(core_index_3, CoreIndex(1)); +} + #[test] fn backing_works_while_validation_ongoing() { let test_state = TestState::default(); @@ -1422,7 +1540,7 @@ fn backing_works_after_failed_validation() { fn candidate_backing_reorders_votes() { use sp_core::Encode; - let para_id = ParaId::from(10); + let core_idx = CoreIndex(10); let validators = vec![ Sr25519Keyring::Alice, Sr25519Keyring::Bob, @@ -1436,7 +1554,7 @@ fn candidate_backing_reorders_votes() { let validator_groups = { let mut validator_groups = HashMap::new(); validator_groups - .insert(para_id, vec![0, 1, 2, 3, 4, 5].into_iter().map(ValidatorIndex).collect()); + .insert(core_idx, vec![0, 1, 2, 3, 4, 5].into_iter().map(ValidatorIndex).collect()); validator_groups }; @@ -1466,10 +1584,10 @@ fn candidate_backing_reorders_votes() { (ValidatorIndex(3), fake_attestation(3)), (ValidatorIndex(1), fake_attestation(1)), ], - group_id: para_id, + group_id: core_idx, }; - let backed = table_attested_to_backed(attested, &table_context).unwrap(); + let backed = table_attested_to_backed(attested, &table_context, false).unwrap(); let expected_bitvec = { let mut validator_indices = BitVec::::with_capacity(6); diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index 578f21bef665..94310d2aa164 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -185,6 +185,16 @@ async fn activate_leaf( } ); + // Node features request from runtime: all features are disabled. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::NodeFeatures(_session_index, tx)) + ) if parent == hash => { + tx.send(Ok(Default::default())).unwrap(); + } + ); + // Check if subsystem job issues a request for the minimum backing votes. assert_matches!( virtual_overseer.recv().await, @@ -305,10 +315,11 @@ async fn assert_hypothetical_frontier_requests( ) => { let idx = match expected_requests.iter().position(|r| r.0 == request) { Some(idx) => idx, - None => panic!( + None => + panic!( "unexpected hypothetical frontier request, no match found for {:?}", request - ), + ), }; let resp = std::mem::take(&mut expected_requests[idx].1); tx.send(resp).unwrap(); @@ -1268,6 +1279,7 @@ fn concurrent_dependent_candidates() { let statement_b = CandidateBackingMessage::Statement(leaf_parent, signed_b.clone()); virtual_overseer.send(FromOrchestra::Communication { msg: statement_a }).await; + // At this point the subsystem waits for response, the previous message is received, // send a second one without blocking. let _ = virtual_overseer @@ -1388,7 +1400,19 @@ fn concurrent_dependent_candidates() { assert_eq!(sess_idx, 1); tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::ValidatorGroups(tx), + )) => { + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::AvailabilityCores(tx), + )) => { + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + }, _ => panic!("unexpected message received from overseer: {:?}", msg), } } @@ -1419,7 +1443,6 @@ fn seconding_sanity_check_occupy_same_depth() { let leaf_parent = get_parent_hash(leaf_hash); let activated = new_leaf(leaf_hash, LEAF_BLOCK_NUMBER); - let min_block_number = LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN; let min_relay_parents = vec![(para_id_a, min_block_number), (para_id_b, min_block_number)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; @@ -1555,13 +1578,14 @@ fn occupied_core_assignment() { const LEAF_A_BLOCK_NUMBER: BlockNumber = 100; const LEAF_A_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; + let previous_para_id = test_state.chain_ids[1]; // Set the core state to occupied. let mut candidate_descriptor = ::test_helpers::dummy_candidate_descriptor(Hash::zero()); - candidate_descriptor.para_id = para_id; + candidate_descriptor.para_id = previous_para_id; test_state.availability_cores[0] = CoreState::Occupied(OccupiedCore { group_responsible: Default::default(), - next_up_on_available: None, + next_up_on_available: Some(ScheduledCore { para_id, collator: None }), occupied_since: 100_u32, time_out_at: 200_u32, next_up_on_time_out: None, diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 51f768d782e0..d98f6ebfe428 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -681,10 +681,17 @@ async fn request_backable_candidates( CoreState::Free => continue, }; + // We should be calling this once per para rather than per core. + // TODO: Will be fixed in https://github.com/paritytech/polkadot-sdk/pull/3233. + // For now, at least make sure we don't supply the same candidate multiple times in case a + // para has multiple cores scheduled. let response = get_backable_candidate(relay_parent, para_id, required_path, sender).await?; - match response { - Some((hash, relay_parent)) => selected_candidates.push((hash, relay_parent)), + Some((hash, relay_parent)) => { + if !selected_candidates.iter().any(|bc| &(hash, relay_parent) == bc) { + selected_candidates.push((hash, relay_parent)) + } + }, None => { gum::debug!( target: LOG_TARGET, @@ -726,6 +733,7 @@ async fn select_candidates( ) .await?, }; + gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates"); // now get the backed candidates corresponding to these candidate receipts let (tx, rx) = oneshot::channel(); diff --git a/polkadot/primitives/Cargo.toml b/polkadot/primitives/Cargo.toml index 58d3d1001610..e63fb621c788 100644 --- a/polkadot/primitives/Cargo.toml +++ b/polkadot/primitives/Cargo.toml @@ -14,6 +14,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc", "se hex-literal = "0.4.1" parity-scale-codec = { version = "3.6.1", default-features = false, features = ["bit-vec", "derive"] } scale-info = { version = "2.10.0", default-features = false, features = ["bit-vec", "derive", "serde"] } +log = { workspace = true, default-features = false } serde = { features = ["alloc", "derive"], workspace = true } application-crypto = { package = "sp-application-crypto", path = "../../substrate/primitives/application-crypto", default-features = false, features = ["serde"] } @@ -38,6 +39,7 @@ std = [ "application-crypto/std", "bitvec/std", "inherents/std", + "log/std", "parity-scale-codec/std", "polkadot-core-primitives/std", "polkadot-parachain-primitives/std", diff --git a/polkadot/primitives/src/v6/mod.rs b/polkadot/primitives/src/v6/mod.rs index 4938d20d2d1b..538eb3855848 100644 --- a/polkadot/primitives/src/v6/mod.rs +++ b/polkadot/primitives/src/v6/mod.rs @@ -72,6 +72,7 @@ pub use metrics::{ /// The key type ID for a collator key. pub const COLLATOR_KEY_TYPE_ID: KeyTypeId = KeyTypeId(*b"coll"); +const LOG_TARGET: &str = "runtime::primitives"; mod collator_app { use application_crypto::{app_crypto, sr25519}; @@ -746,17 +747,29 @@ impl BackedCandidate { /// /// Returns either an error, indicating that one of the signatures was invalid or that the index /// was out-of-bounds, or the number of signatures checked. -pub fn check_candidate_backing + Clone + Encode>( +pub fn check_candidate_backing + Clone + Encode + core::fmt::Debug>( backed: &BackedCandidate, signing_context: &SigningContext, group_len: usize, validator_lookup: impl Fn(usize) -> Option, ) -> Result { if backed.validator_indices.len() != group_len { + log::debug!( + target: LOG_TARGET, + "Check candidate backing: indices mismatch: group_len = {} , indices_len = {}", + group_len, + backed.validator_indices.len(), + ); return Err(()) } if backed.validity_votes.len() > group_len { + log::debug!( + target: LOG_TARGET, + "Check candidate backing: Too many votes, expected: {}, found: {}", + group_len, + backed.validity_votes.len(), + ); return Err(()) } @@ -778,11 +791,23 @@ pub fn check_candidate_backing + Clone + Encode>( if sig.verify(&payload[..], &validator_id) { signed += 1; } else { + log::debug!( + target: LOG_TARGET, + "Check candidate backing: Invalid signature. validator_id = {:?}, validator_index = {} ", + validator_id, + val_in_group_idx, + ); return Err(()) } } if signed != backed.validity_votes.len() { + log::error!( + target: LOG_TARGET, + "Check candidate backing: Too many signatures, expected = {}, found = {}", + backed.validity_votes.len() , + signed, + ); return Err(()) } diff --git a/polkadot/primitives/src/vstaging/mod.rs b/polkadot/primitives/src/vstaging/mod.rs index 630bcf8679ad..39d9dfc02c5b 100644 --- a/polkadot/primitives/src/vstaging/mod.rs +++ b/polkadot/primitives/src/vstaging/mod.rs @@ -64,9 +64,13 @@ pub mod node_features { /// Tells if tranch0 assignments could be sent in a single certificate. /// Reserved for: `` EnableAssignmentsV2 = 0, + /// This feature enables the extension of `BackedCandidate::validator_indices` by 8 bits. + /// The value stored there represents the assumed core index where the candidates + /// are backed. This is needed for the elastic scaling MVP. + ElasticScalingMVP = 1, /// First unassigned feature bit. /// Every time a new feature flag is assigned it should take this value. /// and this should be incremented. - FirstUnassigned = 1, + FirstUnassigned = 2, } } diff --git a/polkadot/statement-table/Cargo.toml b/polkadot/statement-table/Cargo.toml index 6403b822ed9b..37b8a99d640a 100644 --- a/polkadot/statement-table/Cargo.toml +++ b/polkadot/statement-table/Cargo.toml @@ -13,3 +13,4 @@ workspace = true parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } sp-core = { path = "../../substrate/primitives/core" } primitives = { package = "polkadot-primitives", path = "../primitives" } +gum = { package = "tracing-gum", path = "../node/gum" } diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs index 22bffde5acc1..2ee6f6a4f781 100644 --- a/polkadot/statement-table/src/generic.rs +++ b/polkadot/statement-table/src/generic.rs @@ -36,6 +36,7 @@ use primitives::{ }; use parity_scale_codec::{Decode, Encode}; +const LOG_TARGET: &str = "parachain::statement-table"; /// Context for the statement table. pub trait Context { @@ -53,9 +54,6 @@ pub trait Context { /// get the digest of a candidate. fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest; - /// get the group of a candidate. - fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId; - /// Whether a authority is a member of a group. /// Members are meant to submit candidates and vote on validity. fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool; @@ -342,13 +340,13 @@ impl Table { pub fn import_statement( &mut self, context: &Ctx, + group_id: Ctx::GroupId, statement: SignedStatement, ) -> Option> { let SignedStatement { statement, signature, sender: signer } = statement; - let res = match statement { Statement::Seconded(candidate) => - self.import_candidate(context, signer.clone(), candidate, signature), + self.import_candidate(context, signer.clone(), candidate, signature, group_id), Statement::Valid(digest) => self.validity_vote(context, signer.clone(), digest, ValidityVote::Valid(signature)), }; @@ -387,9 +385,10 @@ impl Table { authority: Ctx::AuthorityId, candidate: Ctx::Candidate, signature: Ctx::Signature, + group: Ctx::GroupId, ) -> ImportResult { - let group = Ctx::candidate_group(&candidate); if !context.is_member_of(&authority, &group) { + gum::debug!(target: LOG_TARGET, authority = ?authority, group = ?group, "New `Misbehavior::UnauthorizedStatement`, candidate backed by validator that doesn't belong to expected group" ); return Err(Misbehavior::UnauthorizedStatement(UnauthorizedStatement { statement: SignedStatement { signature, @@ -634,10 +633,6 @@ mod tests { Digest(candidate.1) } - fn candidate_group(candidate: &Candidate) -> GroupId { - GroupId(candidate.0) - } - fn is_member_of(&self, authority: &AuthorityId, group: &GroupId) -> bool { self.authorities.get(authority).map(|v| v == group).unwrap_or(false) } @@ -675,10 +670,10 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, statement_a); + table.import_statement(&context, GroupId(2), statement_a); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); - table.import_statement(&context, statement_b); + table.import_statement(&context, GroupId(2), statement_b); assert_eq!( table.detected_misbehavior[&AuthorityId(1)][0], Misbehavior::MultipleCandidates(MultipleCandidates { @@ -711,10 +706,10 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, statement_a); + table.import_statement(&context, GroupId(2), statement_a); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); - table.import_statement(&context, statement_b); + table.import_statement(&context, GroupId(2), statement_b); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); } @@ -735,7 +730,7 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, statement); + table.import_statement(&context, GroupId(2), statement); assert_eq!( table.detected_misbehavior[&AuthorityId(1)][0], @@ -769,7 +764,7 @@ mod tests { }; let candidate_a_digest = Digest(100); - table.import_statement(&context, candidate_a); + table.import_statement(&context, GroupId(2), candidate_a); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); @@ -779,7 +774,7 @@ mod tests { signature: Signature(2), sender: AuthorityId(2), }; - table.import_statement(&context, bad_validity_vote); + table.import_statement(&context, GroupId(3), bad_validity_vote); assert_eq!( table.detected_misbehavior[&AuthorityId(2)][0], @@ -811,7 +806,7 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, statement); + table.import_statement(&context, GroupId(2), statement); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let invalid_statement = SignedStatement { @@ -820,7 +815,7 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, invalid_statement); + table.import_statement(&context, GroupId(2), invalid_statement); assert!(table.detected_misbehavior.contains_key(&AuthorityId(1))); } @@ -842,7 +837,7 @@ mod tests { }; let candidate_digest = Digest(100); - table.import_statement(&context, statement); + table.import_statement(&context, GroupId(2), statement); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let extra_vote = SignedStatement { @@ -851,7 +846,7 @@ mod tests { sender: AuthorityId(1), }; - table.import_statement(&context, extra_vote); + table.import_statement(&context, GroupId(2), extra_vote); assert_eq!( table.detected_misbehavior[&AuthorityId(1)][0], Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity( @@ -910,7 +905,7 @@ mod tests { }; let candidate_digest = Digest(100); - table.import_statement(&context, statement); + table.import_statement(&context, GroupId(2), statement); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); assert!(table.attested_candidate(&candidate_digest, &context, 2).is_none()); @@ -921,7 +916,7 @@ mod tests { sender: AuthorityId(2), }; - table.import_statement(&context, vote); + table.import_statement(&context, GroupId(2), vote); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); assert!(table.attested_candidate(&candidate_digest, &context, 2).is_some()); } @@ -944,7 +939,7 @@ mod tests { }; let summary = table - .import_statement(&context, statement) + .import_statement(&context, GroupId(2), statement) .expect("candidate import to give summary"); assert_eq!(summary.candidate, Digest(100)); @@ -971,7 +966,7 @@ mod tests { }; let candidate_digest = Digest(100); - table.import_statement(&context, statement); + table.import_statement(&context, GroupId(2), statement); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let vote = SignedStatement { @@ -980,8 +975,9 @@ mod tests { sender: AuthorityId(2), }; - let summary = - table.import_statement(&context, vote).expect("candidate vote to give summary"); + let summary = table + .import_statement(&context, GroupId(2), vote) + .expect("candidate vote to give summary"); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); diff --git a/polkadot/statement-table/src/lib.rs b/polkadot/statement-table/src/lib.rs index d4629330ac01..3740d15cc4f3 100644 --- a/polkadot/statement-table/src/lib.rs +++ b/polkadot/statement-table/src/lib.rs @@ -35,8 +35,8 @@ pub use generic::{Config, Context, Table}; pub mod v2 { use crate::generic; use primitives::{ - CandidateHash, CommittedCandidateReceipt, CompactStatement as PrimitiveStatement, Id, - ValidatorIndex, ValidatorSignature, + CandidateHash, CommittedCandidateReceipt, CompactStatement as PrimitiveStatement, + CoreIndex, ValidatorIndex, ValidatorSignature, }; /// Statements about candidates on the network. @@ -59,7 +59,7 @@ pub mod v2 { >; /// A summary of import of a statement. - pub type Summary = generic::Summary; + pub type Summary = generic::Summary; impl<'a> From<&'a Statement> for PrimitiveStatement { fn from(s: &'a Statement) -> PrimitiveStatement {