Skip to content

Commit

Permalink
dispute-coordinator: disabling in participation (#2637)
Browse files Browse the repository at this point in the history
Closes #2225.

- [x] tests
- [x] fix todos
- [x] fix duplicates
- [x] make the check part of `potential_spam` 
- [x] fix a bug with votes insertion
- [x] guide changes
- [x] docs

---------

Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
  • Loading branch information
ordian and tdimitrov authored Jan 9, 2024
1 parent a02b534 commit 0ff3f4d
Show file tree
Hide file tree
Showing 12 changed files with 1,135 additions and 170 deletions.
7 changes: 4 additions & 3 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,9 +1030,10 @@ async fn construct_per_relay_parent_state<Context>(
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent)
.await
.map_err(Error::UtilError)?;
let disabled_validators =
get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| {
Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
})?;

let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
Expand Down
27 changes: 26 additions & 1 deletion polkadot/node/core/dispute-coordinator/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct CandidateEnvironment<'a> {
executor_params: &'a ExecutorParams,
/// Validator indices controlled by this node.
controlled_indices: HashSet<ValidatorIndex>,
/// Indices of disabled validators at the `relay_parent`.
disabled_indices: HashSet<ValidatorIndex>,
}

#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
Expand All @@ -66,6 +68,16 @@ impl<'a> CandidateEnvironment<'a> {
session_index: SessionIndex,
relay_parent: Hash,
) -> Option<CandidateEnvironment<'a>> {
let disabled_indices = runtime_info
.get_disabled_validators(ctx.sender(), relay_parent)
.await
.unwrap_or_else(|err| {
gum::info!(target: LOG_TARGET, ?err, "Failed to get disabled validators");
Vec::new()
})
.into_iter()
.collect();

let (session, executor_params) = match runtime_info
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await
Expand All @@ -76,7 +88,7 @@ impl<'a> CandidateEnvironment<'a> {
};

let controlled_indices = find_controlled_validator_indices(keystore, &session.validators);
Some(Self { session_index, session, executor_params, controlled_indices })
Some(Self { session_index, session, executor_params, controlled_indices, disabled_indices })
}

/// Validators in the candidate's session.
Expand All @@ -103,6 +115,11 @@ impl<'a> CandidateEnvironment<'a> {
pub fn controlled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.controlled_indices
}

/// Indices of disabled validators at the `relay_parent`.
pub fn disabled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.disabled_indices
}
}

/// Whether or not we already issued some statement about a candidate.
Expand Down Expand Up @@ -344,6 +361,14 @@ impl CandidateVoteState<CandidateVotes> {
&self.votes.candidate_receipt
}

/// Returns true if all the invalid votes are from disabled validators.
pub fn invalid_votes_all_disabled(
&self,
mut is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
self.votes.invalid.keys().all(|i| is_disabled(i))
}

/// Extract `CandidateVotes` for handling import of new statements.
fn into_old_state(self) -> (CandidateVotes, CandidateVoteState<()>) {
let CandidateVoteState { votes, own_vote, dispute_status, byzantine_threshold_against } =
Expand Down
155 changes: 140 additions & 15 deletions polkadot/node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashSet, VecDeque},
sync::Arc,
};

Expand Down Expand Up @@ -47,6 +47,7 @@ use polkadot_primitives::{
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex,
};
use schnellru::{LruMap, UnlimitedCompact};

use crate::{
db,
Expand Down Expand Up @@ -92,6 +93,9 @@ pub struct InitialData {
pub(crate) struct Initialized {
keystore: Arc<LocalKeystore>,
runtime_info: RuntimeInfo,
/// We have the onchain state of disabled validators as well as the offchain
/// state that is based on the lost disputes.
offchain_disabled_validators: OffchainDisabledValidators,
/// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doesn't matter if it
/// was cached successfully or not. It is used to detect ancient disputes.
highest_session_seen: SessionIndex,
Expand Down Expand Up @@ -130,10 +134,12 @@ impl Initialized {

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender, metrics.clone());
let offchain_disabled_validators = OffchainDisabledValidators::default();

Self {
keystore,
runtime_info,
offchain_disabled_validators,
highest_session_seen,
gaps_in_cache,
spam_slots,
Expand Down Expand Up @@ -319,13 +325,16 @@ impl Initialized {
self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle);
// Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps
// in cache and we are not missing too many `SessionInfo`s
let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
if !self.gaps_in_cache && self.highest_session_seen > lower_bound {
lower_bound = self.highest_session_seen + 1
}
let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
let fetch_lower_bound =
if !self.gaps_in_cache && self.highest_session_seen > prune_up_to {
self.highest_session_seen + 1
} else {
prune_up_to
};

// There is a new session. Perform a dummy fetch to cache it.
for idx in lower_bound..=session_idx {
for idx in fetch_lower_bound..=session_idx {
if let Err(err) = self
.runtime_info
.get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
Expand All @@ -344,11 +353,9 @@ impl Initialized {

self.highest_session_seen = session_idx;

db::v1::note_earliest_session(
overlay_db,
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1));
db::v1::note_earliest_session(overlay_db, prune_up_to)?;
self.spam_slots.prune_old(prune_up_to);
self.offchain_disabled_validators.prune_old(prune_up_to);
},
Ok(_) => { /* no new session => nothing to cache */ },
Err(err) => {
Expand Down Expand Up @@ -978,11 +985,13 @@ impl Initialized {
Some(env) => env,
};

let n_validators = env.validators().len();

gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Number of validators"
);

Expand Down Expand Up @@ -1084,18 +1093,42 @@ impl Initialized {
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Import result ready"
);

let new_state = import_result.new_state();

let byzantine_threshold = polkadot_primitives::byzantine_threshold(n_validators);
// combine on-chain with off-chain disabled validators
// process disabled validators in the following order:
// - on-chain disabled validators
// - prioritized order of off-chain disabled validators
// deduplicate the list and take at most `byzantine_threshold` validators
let disabled_validators = {
let mut d: HashSet<ValidatorIndex> = HashSet::new();
for v in env
.disabled_indices()
.iter()
.cloned()
.chain(self.offchain_disabled_validators.iter(session))
{
if d.len() == byzantine_threshold {
break
}
d.insert(v);
}
d
};

let is_included = self.scraper.is_candidate_included(&candidate_hash);
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let own_vote_missing = new_state.own_vote_missing();
let is_disputed = new_state.is_disputed();
let is_confirmed = new_state.is_confirmed();
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash);
// We participate only in disputes which are not potential spam.
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash, |v| {
disabled_validators.contains(v)
});
let allow_participation = !potential_spam;

gum::trace!(
Expand All @@ -1106,6 +1139,7 @@ impl Initialized {
?candidate_hash,
confirmed = ?new_state.is_confirmed(),
has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
n_disabled_validators = ?disabled_validators.len(),
"Is spam?"
);

Expand Down Expand Up @@ -1337,6 +1371,10 @@ impl Initialized {
);
}
}
for validator_index in new_state.votes().invalid.keys() {
self.offchain_disabled_validators
.insert_against_valid(session, *validator_index);
}
self.metrics.on_concluded_valid();
}
if import_result.is_freshly_concluded_against() {
Expand All @@ -1356,6 +1394,14 @@ impl Initialized {
);
}
}
for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() {
let is_backer = kind.is_backing();
self.offchain_disabled_validators.insert_for_invalid(
session,
*validator_index,
is_backer,
);
}
self.metrics.on_concluded_invalid();
}

Expand Down Expand Up @@ -1591,3 +1637,82 @@ fn determine_undisputed_chain(

Ok(last)
}

#[derive(Default)]
struct OffchainDisabledValidators {
// Ideally, we want to use the top `byzantine_threshold` offenders here based on the amount of
// stake slashed. However, given that slashing might be applied with a delay, we want to have
// some list of offenders as soon as disputes conclude offchain. This list only approximates
// the top offenders and only accounts for lost disputes. But that should be good enough to
// prevent spam attacks.
per_session: BTreeMap<SessionIndex, LostSessionDisputes>,
}

struct LostSessionDisputes {
// We separate lost disputes to prioritize "for invalid" offenders. And among those, we
// prioritize backing votes the most. There's no need to limit the size of these sets, as they
// are already limited by the number of validators in the session. We use `LruMap` to ensure
// the iteration order prioritizes most recently disputes lost over older ones in case we reach
// the limit.
backers_for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
against_valid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
}

impl Default for LostSessionDisputes {
fn default() -> Self {
Self {
backers_for_invalid: LruMap::new(UnlimitedCompact),
for_invalid: LruMap::new(UnlimitedCompact),
against_valid: LruMap::new(UnlimitedCompact),
}
}
}

impl OffchainDisabledValidators {
fn prune_old(&mut self, up_to_excluding: SessionIndex) {
// split_off returns everything after the given key, including the key.
let mut relevant = self.per_session.split_off(&up_to_excluding);
std::mem::swap(&mut relevant, &mut self.per_session);
}

fn insert_for_invalid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
is_backer: bool,
) {
let entry = self.per_session.entry(session_index).or_default();
if is_backer {
entry.backers_for_invalid.insert(validator_index, ());
} else {
entry.for_invalid.insert(validator_index, ());
}
}

fn insert_against_valid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
) {
self.per_session
.entry(session_index)
.or_default()
.against_valid
.insert(validator_index, ());
}

/// Iterate over all validators that are offchain disabled.
/// The order of iteration prioritizes `for_invalid` offenders (and backers among those) over
/// `against_valid` offenders. And most recently lost disputes over older ones.
/// NOTE: the iterator might contain duplicates.
fn iter(&self, session_index: SessionIndex) -> impl Iterator<Item = ValidatorIndex> + '_ {
self.per_session.get(&session_index).into_iter().flat_map(|e| {
e.backers_for_invalid
.iter()
.chain(e.for_invalid.iter())
.chain(e.against_valid.iter())
.map(|(i, _)| *i)
})
}
}
15 changes: 10 additions & 5 deletions polkadot/node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,10 @@ impl DisputeCoordinatorSubsystem {
},
};
let vote_state = CandidateVoteState::new(votes, &env, now);

let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash);
let onchain_disabled = env.disabled_indices();
let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash, |v| {
onchain_disabled.contains(v)
});
let is_included =
scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());

Expand Down Expand Up @@ -462,17 +464,20 @@ async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<Activa
/// Check wheter a dispute for the given candidate could be spam.
///
/// That is the candidate could be made up.
pub fn is_potential_spam<V>(
pub fn is_potential_spam(
scraper: &ChainScraper,
vote_state: &CandidateVoteState<V>,
vote_state: &CandidateVoteState<CandidateVotes>,
candidate_hash: &CandidateHash,
is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
let is_disputed = vote_state.is_disputed();
let is_included = scraper.is_candidate_included(candidate_hash);
let is_backed = scraper.is_candidate_backed(candidate_hash);
let is_confirmed = vote_state.is_confirmed();
let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;

is_disputed && !is_included && !is_backed && !is_confirmed
(is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
}

/// Tell dispute-distribution to send all our votes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ fn cannot_participate_if_cannot_recover_validation_code() {
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();

recover_available_data(&mut ctx_handle).await;

assert_matches!(
Expand Down
Loading

0 comments on commit 0ff3f4d

Please sign in to comment.