From 0ff3f4d3af0036bbae624011b720bfd5e93ce91b Mon Sep 17 00:00:00 2001 From: ordian Date: Tue, 9 Jan 2024 07:44:19 +0100 Subject: [PATCH] dispute-coordinator: disabling in participation (#2637) Closes #2225. - [x] tests - [x] fix todos - [x] fix duplicates - [x] make the check part of `potential_spam` - [x] fix a bug with votes insertion - [x] guide changes - [x] docs --------- Co-authored-by: Tsvetomir Dimitrov --- polkadot/node/core/backing/src/lib.rs | 7 +- .../core/dispute-coordinator/src/import.rs | 27 +- .../dispute-coordinator/src/initialized.rs | 155 ++- .../node/core/dispute-coordinator/src/lib.rs | 15 +- .../src/participation/tests.rs | 1 - .../core/dispute-coordinator/src/tests.rs | 987 +++++++++++++++--- polkadot/node/subsystem-util/src/lib.rs | 18 +- .../node/subsystem-util/src/runtime/mod.rs | 28 +- polkadot/node/subsystem-util/src/vstaging.rs | 6 +- polkadot/primitives/src/v6/mod.rs | 21 +- .../src/node/disputes/dispute-coordinator.md | 22 + prdoc/pr_2637.prdoc | 18 + 12 files changed, 1135 insertions(+), 170 deletions(-) create mode 100644 prdoc/pr_2637.prdoc diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index ba25ebb192e2..98bbd6232add 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -1030,9 +1030,10 @@ async fn construct_per_relay_parent_state( // Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to // `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the // `try_join!` above and use `try_runtime_api!` to get `disabled_validators` - let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent) - .await - .map_err(Error::UtilError)?; + let disabled_validators = + get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| { + Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed")) + })?; let signing_context = SigningContext { parent_hash: parent, session_index }; let validator = match Validator::construct( diff --git a/polkadot/node/core/dispute-coordinator/src/import.rs b/polkadot/node/core/dispute-coordinator/src/import.rs index 98c12bd509b4..278561d5d00c 100644 --- a/polkadot/node/core/dispute-coordinator/src/import.rs +++ b/polkadot/node/core/dispute-coordinator/src/import.rs @@ -52,6 +52,8 @@ pub struct CandidateEnvironment<'a> { executor_params: &'a ExecutorParams, /// Validator indices controlled by this node. controlled_indices: HashSet, + /// Indices of disabled validators at the `relay_parent`. + disabled_indices: HashSet, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -66,6 +68,16 @@ impl<'a> CandidateEnvironment<'a> { session_index: SessionIndex, relay_parent: Hash, ) -> Option> { + let disabled_indices = runtime_info + .get_disabled_validators(ctx.sender(), relay_parent) + .await + .unwrap_or_else(|err| { + gum::info!(target: LOG_TARGET, ?err, "Failed to get disabled validators"); + Vec::new() + }) + .into_iter() + .collect(); + let (session, executor_params) = match runtime_info .get_session_info_by_index(ctx.sender(), relay_parent, session_index) .await @@ -76,7 +88,7 @@ impl<'a> CandidateEnvironment<'a> { }; let controlled_indices = find_controlled_validator_indices(keystore, &session.validators); - Some(Self { session_index, session, executor_params, controlled_indices }) + Some(Self { session_index, session, executor_params, controlled_indices, disabled_indices }) } /// Validators in the candidate's session. @@ -103,6 +115,11 @@ impl<'a> CandidateEnvironment<'a> { pub fn controlled_indices(&'a self) -> &'a HashSet { &self.controlled_indices } + + /// Indices of disabled validators at the `relay_parent`. + pub fn disabled_indices(&'a self) -> &'a HashSet { + &self.disabled_indices + } } /// Whether or not we already issued some statement about a candidate. @@ -344,6 +361,14 @@ impl CandidateVoteState { &self.votes.candidate_receipt } + /// Returns true if all the invalid votes are from disabled validators. + pub fn invalid_votes_all_disabled( + &self, + mut is_disabled: impl FnMut(&ValidatorIndex) -> bool, + ) -> bool { + self.votes.invalid.keys().all(|i| is_disabled(i)) + } + /// Extract `CandidateVotes` for handling import of new statements. fn into_old_state(self) -> (CandidateVotes, CandidateVoteState<()>) { let CandidateVoteState { votes, own_vote, dispute_status, byzantine_threshold_against } = diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs index d9cd4e39d3cb..a1bcc1f01707 100644 --- a/polkadot/node/core/dispute-coordinator/src/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -17,7 +17,7 @@ //! Dispute coordinator subsystem in initialized state (after first active leaf is received). use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashSet, VecDeque}, sync::Arc, }; @@ -47,6 +47,7 @@ use polkadot_primitives::{ DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, }; +use schnellru::{LruMap, UnlimitedCompact}; use crate::{ db, @@ -92,6 +93,9 @@ pub struct InitialData { pub(crate) struct Initialized { keystore: Arc, runtime_info: RuntimeInfo, + /// We have the onchain state of disabled validators as well as the offchain + /// state that is based on the lost disputes. + offchain_disabled_validators: OffchainDisabledValidators, /// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doesn't matter if it /// was cached successfully or not. It is used to detect ancient disputes. highest_session_seen: SessionIndex, @@ -130,10 +134,12 @@ impl Initialized { let (participation_sender, participation_receiver) = mpsc::channel(1); let participation = Participation::new(participation_sender, metrics.clone()); + let offchain_disabled_validators = OffchainDisabledValidators::default(); Self { keystore, runtime_info, + offchain_disabled_validators, highest_session_seen, gaps_in_cache, spam_slots, @@ -319,13 +325,16 @@ impl Initialized { self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle); // Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps // in cache and we are not missing too many `SessionInfo`s - let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1); - if !self.gaps_in_cache && self.highest_session_seen > lower_bound { - lower_bound = self.highest_session_seen + 1 - } + let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1); + let fetch_lower_bound = + if !self.gaps_in_cache && self.highest_session_seen > prune_up_to { + self.highest_session_seen + 1 + } else { + prune_up_to + }; // There is a new session. Perform a dummy fetch to cache it. - for idx in lower_bound..=session_idx { + for idx in fetch_lower_bound..=session_idx { if let Err(err) = self .runtime_info .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) @@ -344,11 +353,9 @@ impl Initialized { self.highest_session_seen = session_idx; - db::v1::note_earliest_session( - overlay_db, - session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), - )?; - self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); + db::v1::note_earliest_session(overlay_db, prune_up_to)?; + self.spam_slots.prune_old(prune_up_to); + self.offchain_disabled_validators.prune_old(prune_up_to); }, Ok(_) => { /* no new session => nothing to cache */ }, Err(err) => { @@ -978,11 +985,13 @@ impl Initialized { Some(env) => env, }; + let n_validators = env.validators().len(); + gum::trace!( target: LOG_TARGET, ?candidate_hash, ?session, - num_validators = ?env.session_info().validators.len(), + ?n_validators, "Number of validators" ); @@ -1084,18 +1093,42 @@ impl Initialized { target: LOG_TARGET, ?candidate_hash, ?session, - num_validators = ?env.session_info().validators.len(), + ?n_validators, "Import result ready" ); + let new_state = import_result.new_state(); + let byzantine_threshold = polkadot_primitives::byzantine_threshold(n_validators); + // combine on-chain with off-chain disabled validators + // process disabled validators in the following order: + // - on-chain disabled validators + // - prioritized order of off-chain disabled validators + // deduplicate the list and take at most `byzantine_threshold` validators + let disabled_validators = { + let mut d: HashSet = HashSet::new(); + for v in env + .disabled_indices() + .iter() + .cloned() + .chain(self.offchain_disabled_validators.iter(session)) + { + if d.len() == byzantine_threshold { + break + } + d.insert(v); + } + d + }; + let is_included = self.scraper.is_candidate_included(&candidate_hash); let is_backed = self.scraper.is_candidate_backed(&candidate_hash); let own_vote_missing = new_state.own_vote_missing(); let is_disputed = new_state.is_disputed(); let is_confirmed = new_state.is_confirmed(); - let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash); - // We participate only in disputes which are not potential spam. + let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash, |v| { + disabled_validators.contains(v) + }); let allow_participation = !potential_spam; gum::trace!( @@ -1106,6 +1139,7 @@ impl Initialized { ?candidate_hash, confirmed = ?new_state.is_confirmed(), has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(), + n_disabled_validators = ?disabled_validators.len(), "Is spam?" ); @@ -1337,6 +1371,10 @@ impl Initialized { ); } } + for validator_index in new_state.votes().invalid.keys() { + self.offchain_disabled_validators + .insert_against_valid(session, *validator_index); + } self.metrics.on_concluded_valid(); } if import_result.is_freshly_concluded_against() { @@ -1356,6 +1394,14 @@ impl Initialized { ); } } + for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() { + let is_backer = kind.is_backing(); + self.offchain_disabled_validators.insert_for_invalid( + session, + *validator_index, + is_backer, + ); + } self.metrics.on_concluded_invalid(); } @@ -1591,3 +1637,82 @@ fn determine_undisputed_chain( Ok(last) } + +#[derive(Default)] +struct OffchainDisabledValidators { + // Ideally, we want to use the top `byzantine_threshold` offenders here based on the amount of + // stake slashed. However, given that slashing might be applied with a delay, we want to have + // some list of offenders as soon as disputes conclude offchain. This list only approximates + // the top offenders and only accounts for lost disputes. But that should be good enough to + // prevent spam attacks. + per_session: BTreeMap, +} + +struct LostSessionDisputes { + // We separate lost disputes to prioritize "for invalid" offenders. And among those, we + // prioritize backing votes the most. There's no need to limit the size of these sets, as they + // are already limited by the number of validators in the session. We use `LruMap` to ensure + // the iteration order prioritizes most recently disputes lost over older ones in case we reach + // the limit. + backers_for_invalid: LruMap, + for_invalid: LruMap, + against_valid: LruMap, +} + +impl Default for LostSessionDisputes { + fn default() -> Self { + Self { + backers_for_invalid: LruMap::new(UnlimitedCompact), + for_invalid: LruMap::new(UnlimitedCompact), + against_valid: LruMap::new(UnlimitedCompact), + } + } +} + +impl OffchainDisabledValidators { + fn prune_old(&mut self, up_to_excluding: SessionIndex) { + // split_off returns everything after the given key, including the key. + let mut relevant = self.per_session.split_off(&up_to_excluding); + std::mem::swap(&mut relevant, &mut self.per_session); + } + + fn insert_for_invalid( + &mut self, + session_index: SessionIndex, + validator_index: ValidatorIndex, + is_backer: bool, + ) { + let entry = self.per_session.entry(session_index).or_default(); + if is_backer { + entry.backers_for_invalid.insert(validator_index, ()); + } else { + entry.for_invalid.insert(validator_index, ()); + } + } + + fn insert_against_valid( + &mut self, + session_index: SessionIndex, + validator_index: ValidatorIndex, + ) { + self.per_session + .entry(session_index) + .or_default() + .against_valid + .insert(validator_index, ()); + } + + /// Iterate over all validators that are offchain disabled. + /// The order of iteration prioritizes `for_invalid` offenders (and backers among those) over + /// `against_valid` offenders. And most recently lost disputes over older ones. + /// NOTE: the iterator might contain duplicates. + fn iter(&self, session_index: SessionIndex) -> impl Iterator + '_ { + self.per_session.get(&session_index).into_iter().flat_map(|e| { + e.backers_for_invalid + .iter() + .chain(e.for_invalid.iter()) + .chain(e.against_valid.iter()) + .map(|(i, _)| *i) + }) + } +} diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs index 5067d3673da9..c3038fc0953c 100644 --- a/polkadot/node/core/dispute-coordinator/src/lib.rs +++ b/polkadot/node/core/dispute-coordinator/src/lib.rs @@ -370,8 +370,10 @@ impl DisputeCoordinatorSubsystem { }, }; let vote_state = CandidateVoteState::new(votes, &env, now); - - let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash); + let onchain_disabled = env.disabled_indices(); + let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash, |v| { + onchain_disabled.contains(v) + }); let is_included = scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash()); @@ -462,17 +464,20 @@ async fn wait_for_first_leaf(ctx: &mut Context) -> Result( +pub fn is_potential_spam( scraper: &ChainScraper, - vote_state: &CandidateVoteState, + vote_state: &CandidateVoteState, candidate_hash: &CandidateHash, + is_disabled: impl FnMut(&ValidatorIndex) -> bool, ) -> bool { let is_disputed = vote_state.is_disputed(); let is_included = scraper.is_candidate_included(candidate_hash); let is_backed = scraper.is_candidate_backed(candidate_hash); let is_confirmed = vote_state.is_confirmed(); + let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled); + let ignore_disabled = !is_confirmed && all_invalid_votes_disabled; - is_disputed && !is_included && !is_backed && !is_confirmed + (is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled } /// Tell dispute-distribution to send all our votes. diff --git a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs index 012df51d0cd3..367454115f0b 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs @@ -372,7 +372,6 @@ fn cannot_participate_if_cannot_recover_validation_code() { let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); - recover_available_data(&mut ctx_handle).await; assert_matches!( diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs index da449773fe8f..af384256c4f7 100644 --- a/polkadot/node/core/dispute-coordinator/src/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/tests.rs @@ -257,7 +257,7 @@ impl TestState { session: SessionIndex, block_number: BlockNumber, candidate_events: Vec, - ) { + ) -> Hash { assert!(block_number > 0); let block_header = Header { @@ -282,6 +282,8 @@ impl TestState { self.handle_sync_queries(virtual_overseer, block_hash, session, candidate_events) .await; + + block_hash } /// Returns any sent `DisputeMessage`s. @@ -406,6 +408,19 @@ impl TestState { )) => { tx.send(Ok(Vec::new())).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::Version(tx), + )) => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)) + .unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::DisabledValidators(tx), + )) => { + tx.send(Ok(Vec::new())).unwrap(); + }, AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => { let target_header = self .headers @@ -628,15 +643,19 @@ async fn participation_with_distribution( } fn make_valid_candidate_receipt() -> CandidateReceipt { - let mut candidate_receipt = dummy_candidate_receipt_bad_sig(dummy_hash(), dummy_hash()); - candidate_receipt.commitments_hash = CandidateCommitments::default().hash(); - candidate_receipt + make_another_valid_candidate_receipt(dummy_hash()) } fn make_invalid_candidate_receipt() -> CandidateReceipt { dummy_candidate_receipt_bad_sig(Default::default(), Some(Default::default())) } +fn make_another_valid_candidate_receipt(relay_parent: Hash) -> CandidateReceipt { + let mut candidate_receipt = dummy_candidate_receipt_bad_sig(relay_parent, dummy_hash()); + candidate_receipt.commitments_hash = CandidateCommitments::default().hash(); + candidate_receipt +} + // Generate a `CandidateBacked` event from a `CandidateReceipt`. The rest is dummy data. fn make_candidate_backed_event(candidate_receipt: CandidateReceipt) -> CandidateEvent { CandidateEvent::CandidateBacked( @@ -740,6 +759,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() { .await; gum::trace!("After sending `ImportStatements`"); + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new()) .await; @@ -875,6 +895,7 @@ fn approval_vote_import_works() { .into_iter() .collect(); + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, approval_votes) .await; @@ -982,6 +1003,7 @@ fn dispute_gets_confirmed_via_participation() { }) .await; gum::debug!("After First import!"); + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new()) .await; @@ -1131,6 +1153,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new()) .await; @@ -1255,6 +1278,7 @@ fn backing_statements_import_works_and_no_spam() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); { @@ -1387,6 +1411,7 @@ fn conflicting_votes_lead_to_dispute_participation() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -1506,6 +1531,7 @@ fn positive_votes_dont_trigger_participation() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; { let (tx, rx) = oneshot::channel(); @@ -1616,6 +1642,7 @@ fn wrong_validator_index_is_ignored() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; { let (tx, rx) = oneshot::channel(); @@ -1693,6 +1720,7 @@ fn finality_votes_ignore_disputed_candidates() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -1769,14 +1797,10 @@ fn supermajority_valid_dispute_may_be_finalized() { let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); + let candidate_events = vec![make_candidate_backed_event(candidate_receipt.clone())]; test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![make_candidate_backed_event(candidate_receipt.clone())], - ) + .activate_leaf_at_session(&mut virtual_overseer, session, 1, candidate_events) .await; let supermajority_threshold = @@ -1805,6 +1829,7 @@ fn supermajority_valid_dispute_may_be_finalized() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -1942,6 +1967,7 @@ fn concluded_supermajority_for_non_active_after_time() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -2058,6 +2084,7 @@ fn concluded_supermajority_against_non_active_after_time() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; assert_matches!(confirmation_rx.await.unwrap(), @@ -2173,6 +2200,7 @@ fn resume_dispute_without_local_statement() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -2217,13 +2245,23 @@ fn resume_dispute_without_local_statement() { let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); - participation_with_distribution( + participation_full_happy_path( &mut virtual_overseer, - &candidate_hash, candidate_receipt.commitments_hash, ) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeDistribution( + DisputeDistributionMessage::SendDispute(msg) + ) => { + assert_eq!(msg.candidate_receipt().hash(), candidate_hash); + } + ); + let mut statements = Vec::new(); // Getting votes for supermajority. Should already have two valid votes. for i in vec![3, 4, 5, 6, 7] { @@ -2328,6 +2366,7 @@ fn resume_dispute_with_local_statement() { }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -2425,6 +2464,7 @@ fn resume_dispute_without_local_statement_or_local_key() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request( &mut virtual_overseer, &candidate_hash, @@ -2516,6 +2556,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); // Initiate dispute locally: @@ -2556,7 +2597,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation } #[test] -fn own_approval_vote_gets_distributed_on_dispute() { +fn participation_with_onchain_disabling_unconfirmed() { test_harness(|mut test_state, mut virtual_overseer| { Box::pin(async move { let session = 1; @@ -2565,126 +2606,116 @@ fn own_approval_vote_gets_distributed_on_dispute() { let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); + let events = vec![make_candidate_included_event(candidate_receipt.clone())]; test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .activate_leaf_at_session(&mut virtual_overseer, session, 1, events) .await; - let statement = test_state.issue_approval_vote_with_index( - ValidatorIndex(0), - candidate_hash, - session, - ); - - // Import our approval vote: - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: candidate_receipt.clone(), - session, - statements: vec![(statement, ValidatorIndex(0))], - pending_confirmation: None, - }, - }) - .await; + let backer_index = ValidatorIndex(1); + let disabled_index = ValidatorIndex(2); - // Trigger dispute: let (valid_vote, invalid_vote) = generate_opposing_votes_pair( &test_state, - ValidatorIndex(2), - ValidatorIndex(1), + backer_index, + disabled_index, candidate_hash, session, - VoteType::Explicit, + VoteType::Backing, ) .await; let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + // Scenario 1: unconfirmed dispute with onchain disabled validator against. + // Expectation: we import the vote, but do not participate. virtual_overseer .send(FromOrchestra::Communication { msg: DisputeCoordinatorMessage::ImportStatements { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (invalid_vote, ValidatorIndex(1)), - (valid_vote, ValidatorIndex(2)), + (valid_vote, backer_index), + (invalid_vote, disabled_index), ], - pending_confirmation: Some(pending_confirmation), + pending_confirmation, }, }) .await; + + handle_disabled_validators_queries(&mut virtual_overseer, vec![disabled_index]).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); - // Dispute distribution should get notified now (without participation, as we already - // have an approval vote): - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::DisputeDistribution( - DisputeDistributionMessage::SendDispute(msg) - ) => { - assert_eq!(msg.session_index(), session); - assert_eq!(msg.candidate_receipt(), &candidate_receipt); - } - ); - - // No participation should occur: - assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); - - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); - - test_state - }) - }); -} - -#[test] -fn negative_issue_local_statement_only_triggers_import() { - test_harness(|mut test_state, mut virtual_overseer| { - Box::pin(async move { - let session = 1; - - test_state.handle_resume_sync(&mut virtual_overseer, session).await; + // we should not participate due to disabled indices on chain + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); - let candidate_receipt = make_invalid_candidate_receipt(); - let candidate_hash = candidate_receipt.hash(); + // Scenario 2: unconfirmed dispute with non-disabled validator against. + // Expectation: even if the dispute is unconfirmed, we should participate + // once we receive an invalid vote from a non-disabled validator. + let non_disabled_index = ValidatorIndex(3); + let vote = test_state.issue_explicit_statement_with_index( + non_disabled_index, + candidate_hash, + session, + false, + ); + let statements = vec![(vote, non_disabled_index)]; - test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) - .await; + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); virtual_overseer .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::IssueLocalStatement( + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), session, - candidate_hash, - candidate_receipt.clone(), - false, - ), + statements, + pending_confirmation, + }, }) .await; - // Assert that subsystem is not participating. - assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); + participation_with_distribution( + &mut virtual_overseer, + &candidate_hash, + candidate_receipt.commitments_hash, + ) + .await; - let backend = DbBackend::new( - test_state.db.clone(), - test_state.config.column_config(), - Metrics::default(), - ); + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; - let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); - assert_eq!(votes.invalid.len(), 1); - assert_eq!(votes.valid.len(), 0); + assert_eq!(rx.await.unwrap().len(), 1); - let disputes = backend.load_recent_disputes().unwrap(); - assert_eq!(disputes, None); + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 2); // 3+1 => we have participated + assert_eq!(votes.invalid.len(), 2); + } + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); test_state }) @@ -2692,7 +2723,7 @@ fn negative_issue_local_statement_only_triggers_import() { } #[test] -fn redundant_votes_ignored() { +fn participation_with_onchain_disabling_confirmed() { test_harness(|mut test_state, mut virtual_overseer| { Box::pin(async move { let session = 1; @@ -2701,63 +2732,95 @@ fn redundant_votes_ignored() { let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); + let events = vec![make_candidate_included_event(candidate_receipt.clone())]; test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .activate_leaf_at_session(&mut virtual_overseer, session, 1, events) .await; - let valid_vote = test_state.issue_backing_statement_with_index( - ValidatorIndex(1), - candidate_hash, - session, - ); + let backer_index = ValidatorIndex(1); + let disabled_index = ValidatorIndex(2); - let valid_vote_2 = test_state.issue_backing_statement_with_index( - ValidatorIndex(1), + // Scenario 1: confirmed dispute with disabled validator + // Expectation: we import the vote and participate. + let mut statements = Vec::new(); + + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + backer_index, + disabled_index, candidate_hash, session, - ); + VoteType::Backing, + ) + .await; - assert!(valid_vote.validator_signature() != valid_vote_2.validator_signature()); + statements.push((valid_vote, backer_index)); + statements.push((invalid_vote, disabled_index)); - let (tx, rx) = oneshot::channel(); - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: candidate_receipt.clone(), - session, - statements: vec![(valid_vote.clone(), ValidatorIndex(1))], - pending_confirmation: Some(tx), - }, - }) - .await; + // now import enough votes for dispute confirmation + for i in vec![3, 4] { + let vote = test_state.issue_explicit_statement_with_index( + ValidatorIndex(i), + candidate_hash, + session, + true, + ); - rx.await.unwrap(); + statements.push((vote, ValidatorIndex(i as _))); + } + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); - let (tx, rx) = oneshot::channel(); virtual_overseer .send(FromOrchestra::Communication { msg: DisputeCoordinatorMessage::ImportStatements { candidate_receipt: candidate_receipt.clone(), session, - statements: vec![(valid_vote_2, ValidatorIndex(1))], - pending_confirmation: Some(tx), + statements, + pending_confirmation, }, }) .await; - rx.await.unwrap(); + handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await; + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) + .await; + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); - let backend = DbBackend::new( - test_state.db.clone(), - test_state.config.column_config(), - Metrics::default(), - ); + participation_with_distribution( + &mut virtual_overseer, + &candidate_hash, + candidate_receipt.commitments_hash, + ) + .await; - let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); - assert_eq!(votes.invalid.len(), 0); - assert_eq!(votes.valid.len(), 1); - assert_eq!(&votes.valid[0].2, valid_vote.validator_signature()); + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 1); + + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 4); // 3+1 => we have participated + assert_eq!(votes.invalid.len(), 1); + } virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); @@ -2768,9 +2831,7 @@ fn redundant_votes_ignored() { } #[test] -/// Make sure no disputes are recorded when there are no opposing votes, even if we reached -/// supermajority. -fn no_onesided_disputes() { +fn participation_with_offchain_disabling() { test_harness(|mut test_state, mut virtual_overseer| { Box::pin(async move { let session = 1; @@ -2779,13 +2840,641 @@ fn no_onesided_disputes() { let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); + let events = vec![make_candidate_included_event(candidate_receipt.clone())]; + + let block_hash = test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 3, events) + .await; + + let another_candidate_receipt = make_another_valid_candidate_receipt(block_hash); + let another_candidate_hash = another_candidate_receipt.hash(); + let another_events = + vec![make_candidate_included_event(another_candidate_receipt.clone())]; + test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .activate_leaf_at_session(&mut virtual_overseer, session, 4, another_events) .await; + // import enough votes for supermajority to conclude the dispute let mut statements = Vec::new(); - for index in 1..10 { - statements.push(( + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), + ValidatorIndex(2), + candidate_hash, + session, + VoteType::Backing, + ) + .await; + + statements.push((valid_vote, ValidatorIndex(1))); + statements.push((invalid_vote, ValidatorIndex(2))); + + for i in vec![3, 4, 5, 6, 7, 8] { + let vote = test_state.issue_explicit_statement_with_index( + ValidatorIndex(i), + candidate_hash, + session, + true, + ); + + statements.push((vote, ValidatorIndex(i as _))); + } + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }) + .await; + + handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await; + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + participation_with_distribution( + &mut virtual_overseer, + &candidate_hash, + candidate_receipt.commitments_hash, + ) + .await; + + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 1); + + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 8); // 8 => we have participated + assert_eq!(votes.invalid.len(), 1); + } + + // now create another dispute + // Validator 2 should be disabled offchain now + + let mut statements = Vec::new(); + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), + ValidatorIndex(2), + another_candidate_hash, + session, + VoteType::Backing, + ) + .await; + + statements.push((valid_vote, ValidatorIndex(1))); + statements.push((invalid_vote, ValidatorIndex(2))); + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: another_candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }) + .await; + + // let's disable validators 3, 4 on chain, but this should not affect this import + let disabled_validators = vec![ValidatorIndex(3), ValidatorIndex(4)]; + handle_disabled_validators_queries(&mut virtual_overseer, disabled_validators).await; + handle_approval_vote_request( + &mut virtual_overseer, + &another_candidate_hash, + HashMap::new(), + ) + .await; + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + // we should not participate since due to offchain disabling + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + // now import enough votes for dispute confirmation + // even though all of these votes are from (on chain) disabled validators + let mut statements = Vec::new(); + for i in vec![3, 4] { + let vote = test_state.issue_explicit_statement_with_index( + ValidatorIndex(i), + another_candidate_hash, + session, + true, + ); + + statements.push((vote, ValidatorIndex(i as _))); + } + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: another_candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + participation_with_distribution( + &mut virtual_overseer, + &another_candidate_hash, + another_candidate_receipt.commitments_hash, + ) + .await; + + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 2); + + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, another_candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 4); // 3+1 => we have participated + assert_eq!(votes.invalid.len(), 1); + } + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); +} + +// Once the onchain disabling reaches the byzantine threshold, +// offchain disabling will no longer take any effect. +#[test] +fn participation_with_disabling_limits() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_valid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + let events = vec![make_candidate_included_event(candidate_receipt.clone())]; + + let block_hash = test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 3, events) + .await; + + let another_candidate_receipt = make_another_valid_candidate_receipt(block_hash); + let another_candidate_hash = another_candidate_receipt.hash(); + let another_events = + vec![make_candidate_included_event(another_candidate_receipt.clone())]; + + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 4, another_events) + .await; + + // import enough votes for supermajority to conclude the dispute + let mut statements = Vec::new(); + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), + ValidatorIndex(2), + candidate_hash, + session, + VoteType::Backing, + ) + .await; + + statements.push((valid_vote, ValidatorIndex(1))); + statements.push((invalid_vote, ValidatorIndex(2))); + + for i in vec![3, 4, 5, 6, 7, 8] { + let vote = test_state.issue_explicit_statement_with_index( + ValidatorIndex(i), + candidate_hash, + session, + true, + ); + + statements.push((vote, ValidatorIndex(i as _))); + } + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }) + .await; + + handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await; + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + participation_with_distribution( + &mut virtual_overseer, + &candidate_hash, + candidate_receipt.commitments_hash, + ) + .await; + + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 1); + + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 8); // 8 => we have participated + assert_eq!(votes.invalid.len(), 1); + } + + // now create another dispute + // validator 2 should be disabled offchain now + // but due to the byzantine threshold of onchain disabling + // this validator will be considered enabled + + let mut statements = Vec::new(); + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), + ValidatorIndex(2), + another_candidate_hash, + session, + VoteType::Backing, + ) + .await; + + statements.push((valid_vote, ValidatorIndex(1))); + statements.push((invalid_vote, ValidatorIndex(2))); + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + let pending_confirmation = Some(pending_confirmation); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: another_candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }) + .await; + + // let's disable validators 3, 4, 5 on chain, reaching the byzantine threshold + let disabled_validators = vec![ValidatorIndex(3), ValidatorIndex(4), ValidatorIndex(5)]; + handle_disabled_validators_queries(&mut virtual_overseer, disabled_validators).await; + handle_approval_vote_request( + &mut virtual_overseer, + &another_candidate_hash, + HashMap::new(), + ) + .await; + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + participation_with_distribution( + &mut virtual_overseer, + &another_candidate_hash, + another_candidate_receipt.commitments_hash, + ) + .await; + + { + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 2); + + // check if we have participated (cast a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, another_candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.raw().len(), 2); // 2 => we have participated + assert_eq!(votes.invalid.len(), 1); + } + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); +} + +#[test] +fn own_approval_vote_gets_distributed_on_dispute() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_valid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .await; + + let statement = test_state.issue_approval_vote_with_index( + ValidatorIndex(0), + candidate_hash, + session, + ); + + // Import our approval vote: + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![(statement, ValidatorIndex(0))], + pending_confirmation: None, + }, + }) + .await; + + // Trigger dispute: + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(2), + ValidatorIndex(1), + candidate_hash, + session, + VoteType::Explicit, + ) + .await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (invalid_vote, ValidatorIndex(1)), + (valid_vote, ValidatorIndex(2)), + ], + pending_confirmation: Some(pending_confirmation), + }, + }) + .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + // Dispute distribution should get notified now (without participation, as we already + // have an approval vote): + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::DisputeDistribution( + DisputeDistributionMessage::SendDispute(msg) + ) => { + assert_eq!(msg.session_index(), session); + assert_eq!(msg.candidate_receipt(), &candidate_receipt); + } + ); + + // No participation should occur: + assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); +} + +#[test] +fn negative_issue_local_statement_only_triggers_import() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_invalid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::IssueLocalStatement( + session, + candidate_hash, + candidate_receipt.clone(), + false, + ), + }) + .await; + + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; + // Assert that subsystem is not participating. + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + let backend = DbBackend::new( + test_state.db.clone(), + test_state.config.column_config(), + Metrics::default(), + ); + + let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); + assert_eq!(votes.invalid.len(), 1); + assert_eq!(votes.valid.len(), 0); + + let disputes = backend.load_recent_disputes().unwrap(); + assert_eq!(disputes, None); + + test_state + }) + }); +} + +#[test] +fn redundant_votes_ignored() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_valid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .await; + + let valid_vote = test_state.issue_backing_statement_with_index( + ValidatorIndex(1), + candidate_hash, + session, + ); + + let valid_vote_2 = test_state.issue_backing_statement_with_index( + ValidatorIndex(1), + candidate_hash, + session, + ); + + assert!(valid_vote.validator_signature() != valid_vote_2.validator_signature()); + + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![(valid_vote.clone(), ValidatorIndex(1))], + pending_confirmation: Some(tx), + }, + }) + .await; + + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; + rx.await.unwrap(); + + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![(valid_vote_2, ValidatorIndex(1))], + pending_confirmation: Some(tx), + }, + }) + .await; + + rx.await.unwrap(); + + let backend = DbBackend::new( + test_state.db.clone(), + test_state.config.column_config(), + Metrics::default(), + ); + + let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); + assert_eq!(votes.invalid.len(), 0); + assert_eq!(votes.valid.len(), 1); + assert_eq!(&votes.valid[0].2, valid_vote.validator_signature()); + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); +} + +#[test] +/// Make sure no disputes are recorded when there are no opposing votes, even if we reached +/// supermajority. +fn no_onesided_disputes() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_valid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .await; + + let mut statements = Vec::new(); + for index in 1..10 { + statements.push(( test_state.issue_backing_statement_with_index( ValidatorIndex(index), candidate_hash, @@ -2806,6 +3495,7 @@ fn no_onesided_disputes() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); // We should not have any active disputes now. @@ -2869,6 +3559,7 @@ fn refrain_from_participation() { }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -2961,6 +3652,7 @@ fn participation_for_included_candidates() { }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -3049,6 +3741,7 @@ fn local_participation_in_dispute_for_backed_candidate() { }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -3190,6 +3883,7 @@ fn participation_requests_reprioritized_for_newly_included() { }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; // Handle corresponding messages to unblock import // we need to handle `ApprovalVotingMessage::GetApprovalSignaturesForCandidate` for // import @@ -3343,6 +4037,7 @@ fn informs_chain_selection_when_dispute_concluded_against() { }, }) .await; + handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await; handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; assert_matches!(confirmation_rx.await.unwrap(), @@ -3655,3 +4350,27 @@ fn session_info_small_jump_works() { }) }); } + +async fn handle_disabled_validators_queries( + virtual_overseer: &mut VirtualOverseer, + disabled_validators: Vec, +) { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::Version(tx), + )) => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::DisabledValidators(tx), + )) => { + tx.send(Ok(disabled_validators)).unwrap(); + } + ); +} diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 1fe52767df61..f13beb3502fc 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -141,6 +141,20 @@ impl From for Error { } } +impl TryFrom for Error { + type Error = (); + + fn try_from(e: crate::runtime::Error) -> Result { + use crate::runtime::Error; + + match e { + Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)), + Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)), + Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()), + } + } +} + /// A type alias for Runtime API receivers. pub type RuntimeApiReceiver = oneshot::Receiver>; @@ -465,7 +479,9 @@ impl Validator { // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 // When `DisabledValidators` is released remove this and add a // `request_disabled_validators` call here - let disabled_validators = get_disabled_validators_with_fallback(sender, parent).await?; + let disabled_validators = get_disabled_validators_with_fallback(sender, parent) + .await + .map_err(|e| Error::try_from(e).expect("the conversion is infallible; qed"))?; Self::construct(&validators, &disabled_validators, signing_context, keystore) } diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs index 0e44423b4e34..481625acb321 100644 --- a/polkadot/node/subsystem-util/src/runtime/mod.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -43,7 +43,7 @@ use crate::{ request_from_runtime, request_key_ownership_proof, request_on_chain_votes, request_session_executor_params, request_session_index_for_child, request_session_info, request_submit_report_dispute_lost, request_unapplied_slashes, request_validation_code_by_hash, - request_validator_groups, + request_validator_groups, vstaging::get_disabled_validators_with_fallback, }; /// Errors that can happen on runtime fetches. @@ -75,6 +75,11 @@ pub struct RuntimeInfo { /// overseer seems sensible. session_index_cache: LruMap, + /// In the happy case, we do not query disabled validators at all. In the worst case, we can + /// query it order of `n_cores` times `n_validators` per block, so caching it here seems + /// sensible. + disabled_validators_cache: LruMap>, + /// Look up cached sessions by `SessionIndex`. session_info_cache: LruMap, @@ -129,6 +134,7 @@ impl RuntimeInfo { Self { session_index_cache: LruMap::new(ByLength::new(cfg.session_cache_lru_size.max(10))), session_info_cache: LruMap::new(ByLength::new(cfg.session_cache_lru_size)), + disabled_validators_cache: LruMap::new(ByLength::new(100)), pinned_blocks: LruMap::new(ByLength::new(cfg.session_cache_lru_size)), keystore: cfg.keystore, } @@ -180,6 +186,26 @@ impl RuntimeInfo { self.get_session_info_by_index(sender, relay_parent, session_index).await } + /// Get the list of disabled validators at the relay parent. + pub async fn get_disabled_validators( + &mut self, + sender: &mut Sender, + relay_parent: Hash, + ) -> Result> + where + Sender: SubsystemSender, + { + match self.disabled_validators_cache.get(&relay_parent).cloned() { + Some(result) => Ok(result), + None => { + let disabled_validators = + get_disabled_validators_with_fallback(sender, relay_parent).await?; + self.disabled_validators_cache.insert(relay_parent, disabled_validators.clone()); + Ok(disabled_validators) + }, + } + } + /// Get `ExtendedSessionInfo` by session index. /// /// `request_session_info` still requires the parent to be passed in, so we take the parent diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index 3efd3b61f93c..3e807eff5387 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -23,7 +23,7 @@ use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiReque use polkadot_overseer::SubsystemSender; use polkadot_primitives::{Hash, ValidatorIndex}; -use crate::{has_required_runtime, request_disabled_validators, Error}; +use crate::{has_required_runtime, request_disabled_validators, runtime}; const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; @@ -35,7 +35,7 @@ const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; pub async fn get_disabled_validators_with_fallback>( sender: &mut Sender, relay_parent: Hash, -) -> Result, Error> { +) -> Result, runtime::Error> { let disabled_validators = if has_required_runtime( sender, relay_parent, @@ -46,7 +46,7 @@ pub async fn get_disabled_validators_with_fallback bool { - match *self { - Self::Valid(ValidDisputeStatementKind::BackingSeconded(_)) | - Self::Valid(ValidDisputeStatementKind::BackingValid(_)) => true, - Self::Valid(ValidDisputeStatementKind::Explicit) | - Self::Valid(ValidDisputeStatementKind::ApprovalChecking) | - Self::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(_)) | + match self { + Self::Valid(s) => s.is_backing(), Self::Invalid(_) => false, } } @@ -1374,6 +1370,19 @@ pub enum ValidDisputeStatementKind { ApprovalCheckingMultipleCandidates(Vec), } +impl ValidDisputeStatementKind { + /// Whether the statement is from the backing phase. + pub fn is_backing(&self) -> bool { + match self { + ValidDisputeStatementKind::BackingSeconded(_) | + ValidDisputeStatementKind::BackingValid(_) => true, + ValidDisputeStatementKind::Explicit | + ValidDisputeStatementKind::ApprovalChecking | + ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(_) => false, + } + } +} + /// Different kinds of statements of invalidity on a candidate. #[derive(Encode, Decode, Copy, Clone, PartialEq, RuntimeDebug, TypeInfo)] pub enum InvalidDisputeStatementKind { diff --git a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index a9cb2741b083..e0738e219d1b 100644 --- a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -13,6 +13,7 @@ In particular the dispute-coordinator is responsible for: - Ensuring backing votes will never get overridden by explicit votes. - Coordinating actual participation in a dispute, ensuring that the node participates in any justified dispute in a way that ensures resolution of disputes on the network even in the case of many disputes raised (flood/DoS scenario). +- Ensuring disabled validators are not able to spam disputes. - Ensuring disputes resolve, even for candidates on abandoned forks as much as reasonably possible, to rule out "free tries" and thus guarantee our gambler's ruin property. - Providing an API for chain selection, so we can prevent finalization of any chain which has included candidates for @@ -243,6 +244,9 @@ if any of the following holds true: - The dispute is already confirmed: Meaning that 1/3+1 nodes already participated, as this suggests in our threat model that there was at least one honest node that already voted, so the dispute must be genuine. +In addition to that, we only participate in a non-confirmed dispute if at least one vote against the candidate is from +a non-disabled validator. + Note: A node might be out of sync with the chain and we might only learn about a block, including a candidate, after we learned about the dispute. This means, we have to re-evaluate participation decisions on block import! @@ -301,6 +305,7 @@ conditions are satisfied: - the candidate under dispute was not seen included nor backed on any chain - the dispute is not confirmed - we haven't cast a vote for the dispute +- at least one vote against the candidate is from a non-disabled validator Whenever any vote on a dispute is imported these conditions are checked. If the dispute is found not to be potential spam, then spam slots for the disputed candidate hash are cleared. This decrements the spam count for every validator @@ -318,6 +323,23 @@ approval-voting), but we also don't import them until a dispute already conclude opposing votes, so there must be an explicit `invalid` vote in the import. Only a third of the validators can be malicious, so spam disk usage is limited to `2*vote_size*n/3*NUM_SPAM_SLOTS`, with `n` being the number of validators. +### Disabling + +Once a validator has committed an offence (e.g. losing a dispute), it is considered disabled for the rest of the era. +In addition to using the on-chain state of disabled validators, we also keep track of validators who lost a dispute +off-chain. The reason for this is a dispute can be raised for a candidate in a previous era, which means that a +validator that is going to be slashed for it might not even be in the current active set. That means it can't be +disabled on-chain. We need a way to prevent someone from disputing all valid candidates in the previous era. We do this +by keeping track of the validators who lost a dispute in the past few sessions and use that list in addition to the +on-chain disabled validators state. In addition to past session misbehavior, this also heps in case a slash is delayed. + +When we receive a dispute statements set, we do the following: +1. Take the on-chain state of disabled validators at the relay parent block. +1. Take a list of those who lost a dispute in that session in the order that prioritizes the biggest and newest offence. +1. Combine the two lists and take the first byzantine threshold validators from it. +1. If the dispute is unconfimed, check if all votes against the candidate are from disabled validators. +If so, we don't participate in the dispute, but record the votes. + ### Backing Votes Backing votes are in some way special. For starters they are the only valid votes that are guaranteed to exist for any diff --git a/prdoc/pr_2637.prdoc b/prdoc/pr_2637.prdoc new file mode 100644 index 000000000000..a7ab4f93222e --- /dev/null +++ b/prdoc/pr_2637.prdoc @@ -0,0 +1,18 @@ +title: Validator disabling in Dispute Participation. + +doc: + - audience: Node Operator + description: | + Once a validator has been disabled for misbehavior, other validators + should no longer participate in disputes initiated by it. + This feature is needed to ensure robust spam protection against + malicious actors. + +migrations: + db: [] + runtime: [] + +crates: + - name: polkadot-node-core-dispute-coordinator + +host_functions: []