Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Dispute coordinator - Recover disputes on startup #3481

Merged
137 changes: 131 additions & 6 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult
DisputeParticipationMessage, ImportStatementsResult,
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
Expand Down Expand Up @@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180;
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64;

#[derive(Eq, PartialEq)]
enum Participation {
Pending,
Complete,
}

impl Participation {
fn complete(&mut self) -> bool {
let complete = *self == Participation::Complete;
if !complete {
*self = Participation::Complete
}
complete
}
}

struct State {
keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>,
rolling_session_window: RollingSessionWindow,
recovery_state: Participation,
}

/// Configuration for the dispute coordinator subsystem.
Expand Down Expand Up @@ -277,7 +294,7 @@ where
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
Expand All @@ -299,7 +316,7 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<B, Context>(
async fn run_until_error<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
Expand All @@ -314,6 +331,7 @@ where
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Participation::Pending,
};

loop {
Expand All @@ -328,7 +346,14 @@ where
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
).await?
).await?;
if !state.recovery_state.complete() {
handle_startup(
ctx,
&mut overlay_db,
&mut state,
).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => {
Expand All @@ -349,6 +374,98 @@ where
}
}

// Restores the subsystem's state before proceeding with the main event loop. Primarily, this
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the dispute.
async fn handle_startup<Context>(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
{
let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into());
},
};

// Filter out disputes that have already concluded.
let active_disputes = recent_disputes.into_iter()
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::<RecentDisputes>();

for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e);
continue
},
};

let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
continue
}
Some(info) => info.validators.clone(),
};

let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved

// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)|
!voted_indices.contains(&index) &&
state.keystore
.key_pair::<ValidatorPair>(validator)
.ok()
.map_or(false, |v| v.is_some())
);

// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved
candidate_hash: *candidate_hash,
candidate_receipt: votes.candidate_receipt.clone(),
session,
n_validators: n_validators as u32,
report_availability,
}).await;

if !receive_availability.await? {
tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available");
}
}
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}

async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
Expand Down Expand Up @@ -470,8 +587,8 @@ async fn handle_incoming(
) => {
issue_local_statement(
ctx,
state,
overlay_db,
state,
candidate_hash,
candidate_receipt,
session,
Expand Down Expand Up @@ -547,6 +664,10 @@ async fn handle_import_statements(
"Missing info for session which has an active dispute",
);

pending_confirmation
.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;

return Ok(())
}
Some(info) => info.validators.clone(),
Expand Down Expand Up @@ -671,13 +792,17 @@ async fn handle_import_statements(

overlay_db.write_candidate_votes(session, candidate_hash, votes.into());

pending_confirmation
.send(ImportStatementsResult::ValidImport)
.map_err(|_| Error::OneshotSend)?;

Ok(())
}

async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
Expand Down
Loading