diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index aff7a8ee1499..77ce36a597fa 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -26,7 +26,8 @@ use polkadot_node_subsystem::{ AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult, ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, ApprovalDistributionMessage, CandidateValidationMessage, - AvailabilityRecoveryMessage, ChainSelectionMessage, + AvailabilityRecoveryMessage, ChainSelectionMessage, DisputeCoordinatorMessage, + ImportStatementsResult, }, errors::RecoveryError, overseer::{self, SubsystemSender as _}, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem, @@ -41,9 +42,10 @@ use polkadot_primitives::v1::{ ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash, CandidateReceipt, BlockNumber, ValidatorPair, ValidatorSignature, ValidatorId, - CandidateIndex, GroupIndex, ApprovalVote, + CandidateIndex, GroupIndex, ApprovalVote, DisputeStatement, + ValidDisputeStatementKind, }; -use polkadot_node_primitives::ValidationResult; +use polkadot_node_primitives::{SignedDisputeStatement, ValidationResult}; use polkadot_node_primitives::approval::{ IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta, }; @@ -51,7 +53,6 @@ use polkadot_node_jaeger as jaeger; use sc_keystore::LocalKeystore; use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; -use sp_runtime::traits::AppVerify; use sp_application_crypto::Pair; use kvdb::KeyValueDB; @@ -660,6 +661,13 @@ enum Action { candidate: CandidateReceipt, backing_group: GroupIndex, }, + InformDisputeCoordinator { + candidate_hash: CandidateHash, + candidate_receipt: CandidateReceipt, + session: SessionIndex, + dispute_statement: SignedDisputeStatement, + validator_index: ValidatorIndex, + }, NoteApprovedInChainSelection(Hash), IssueApproval(CandidateHash, ApprovalVoteRequest), BecomeActive, @@ -842,7 +850,12 @@ async fn handle_actions( metrics, candidate_hash, approval_request, - )?.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); + ).await? + .into_iter() + .map(|v| v.clone()) + .chain(actions_iter) + .collect(); + actions_iter = next_actions.into_iter(); } Action::LaunchApproval { @@ -905,6 +918,34 @@ async fn handle_actions( Some(_) => {}, } } + Action::InformDisputeCoordinator { + candidate_hash, + candidate_receipt, + session, + dispute_statement, + validator_index, + } => { + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + ctx.send_message(DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt, + session, + statements: vec![(dispute_statement, validator_index)], + pending_confirmation, + }).await; + + match confirmation_rx.await { + Err(oneshot::Canceled) => tracing::warn!( + target: LOG_TARGET, + "Dispute coordinator confirmation lost", + ), + Ok(ImportStatementsResult::ValidImport) => {} + Ok(ImportStatementsResult::InvalidImport) => tracing::warn!( + target: LOG_TARGET, + "Failed to import statements of validity", + ), + } + } Action::NoteApprovedInChainSelection(block_hash) => { ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; } @@ -1581,9 +1622,6 @@ fn check_and_import_approval( )) }; - let approval_payload = ApprovalVote(approved_candidate_hash) - .signing_payload(block_entry.session()); - let pubkey = match session_info.validators.get(approval.validator.0 as usize) { Some(k) => k, None => respond_early!(ApprovalCheckResult::Bad( @@ -1591,13 +1629,20 @@ fn check_and_import_approval( )) }; - let approval_sig_valid = approval.signature.verify(approval_payload.as_slice(), pubkey); - - if !approval_sig_valid { - respond_early!(ApprovalCheckResult::Bad( + // Transform the approval vote into the wrapper used to import statements into disputes. + // This also does signature checking. + let signed_dispute_statement = match SignedDisputeStatement::new_checked( + DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking), + approved_candidate_hash, + block_entry.session(), + pubkey.clone(), + approval.signature.clone(), + ) { + Err(_) => respond_early!(ApprovalCheckResult::Bad( ApprovalCheckError::InvalidSignature(approval.validator), - )) - } + )), + Ok(s) => s, + }; let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? { Some(c) => c, @@ -1635,7 +1680,23 @@ fn check_and_import_approval( "Importing approval vote", ); - let actions = import_checked_approval( + let inform_disputes_action = if !candidate_entry.has_approved(approval.validator) { + // The approval voting system requires a separate approval for each assignment + // to the candidate. It's possible that there are semi-duplicate approvals, + // but we only need to inform the dispute coordinator about the first expressed + // opinion by the validator about the candidate. + Some(Action::InformDisputeCoordinator { + candidate_hash: approved_candidate_hash, + candidate_receipt: candidate_entry.candidate_receipt().clone(), + session: block_entry.session(), + dispute_statement: signed_dispute_statement, + validator_index: approval.validator, + }) + } else { + None + }; + + let mut actions = import_checked_approval( state, db, &metrics, @@ -1645,6 +1706,8 @@ fn check_and_import_approval( ApprovalSource::Remote(approval.validator), ); + actions.extend(inform_disputes_action); + Ok((actions, t)) } @@ -2094,8 +2157,12 @@ async fn launch_approval( (candidate_hash, candidate.descriptor.para_id), ); - // TODO: dispute. Either the merkle trie is bad or the erasure root is. - // https://github.com/paritytech/polkadot/issues/2176 + sender.send_message(DisputeCoordinatorMessage::IssueLocalStatement( + session_index, + candidate_hash, + candidate.clone(), + false, + ).into()).await; metrics_guard.take().on_approval_invalid(); } } @@ -2143,7 +2210,7 @@ async fn launch_approval( sender.send_message(CandidateValidationMessage::ValidateFromExhaustive( available_data.validation_data, validation_code, - candidate.descriptor, + candidate.descriptor.clone(), available_data.pov, val_tx, ).into()).await; @@ -2154,7 +2221,7 @@ async fn launch_approval( validator_index, candidate_hash, ), - Ok(Ok(ValidationResult::Valid(_, _))) => { + Ok(Ok(ValidationResult::Valid(commitments, _))) => { // Validation checked out. Issue an approval command. If the underlying service is unreachable, // then there isn't anything we can do. @@ -2165,11 +2232,28 @@ async fn launch_approval( "Candidate Valid", ); - let _ = metrics_guard.take(); - return ApprovalState::approved( - validator_index, - candidate_hash, - ); + let expected_commitments_hash = candidate.commitments_hash; + if commitments.hash() == expected_commitments_hash { + let _ = metrics_guard.take(); + return ApprovalState::approved( + validator_index, + candidate_hash, + ); + } else { + // Commitments mismatch - issue a dispute. + sender.send_message(DisputeCoordinatorMessage::IssueLocalStatement( + session_index, + candidate_hash, + candidate.clone(), + false, + ).into()).await; + + metrics_guard.take().on_approval_invalid(); + return ApprovalState::failed( + validator_index, + candidate_hash, + ); + } } Ok(Ok(ValidationResult::Invalid(reason))) => { tracing::warn!( @@ -2180,10 +2264,14 @@ async fn launch_approval( "Detected invalid candidate as an approval checker.", ); - // TODO: issue dispute, but not for timeouts. - // https://github.com/paritytech/polkadot/issues/2176 - metrics_guard.take().on_approval_invalid(); + sender.send_message(DisputeCoordinatorMessage::IssueLocalStatement( + session_index, + candidate_hash, + candidate.clone(), + false, + ).into()).await; + metrics_guard.take().on_approval_invalid(); return ApprovalState::failed( validator_index, candidate_hash, @@ -2211,7 +2299,7 @@ async fn launch_approval( // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. -fn issue_approval( +async fn issue_approval( ctx: &mut impl SubsystemSender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, @@ -2307,19 +2395,20 @@ fn issue_approval( } }; + let session = block_entry.session(); let sig = match sign_approval( &state.keystore, &validator_pubkey, candidate_hash, - block_entry.session(), + session, ) { Some(sig) => sig, None => { tracing::warn!( target: LOG_TARGET, - "Could not issue approval signature with validator index {} in session {}. Assignment key present but not validator key?", - validator_index.0, - block_entry.session(), + validator_index = ?validator_index, + session, + "Could not issue approval signature. Assignment key present but not validator key?", ); metrics.on_approval_error(); @@ -2327,6 +2416,16 @@ fn issue_approval( } }; + // Record our statement in the dispute coordinator for later + // participation in disputes on the same candidate. + let signed_dispute_statement = SignedDisputeStatement::new_checked( + DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking), + candidate_hash, + session, + validator_pubkey.clone(), + sig.clone(), + ).expect("Statement just signed; should pass checks; qed"); + tracing::debug!( target: LOG_TARGET, ?candidate_hash, @@ -2335,7 +2434,25 @@ fn issue_approval( "Issuing approval vote", ); - let actions = import_checked_approval( + let candidate_receipt = candidate_entry.candidate_receipt().clone(); + + let inform_disputes_action = if candidate_entry.has_approved(validator_index) { + // The approval voting system requires a separate approval for each assignment + // to the candidate. It's possible that there are semi-duplicate approvals, + // but we only need to inform the dispute coordinator about the first expressed + // opinion by the validator about the candidate. + Some(Action::InformDisputeCoordinator { + candidate_hash, + candidate_receipt, + session, + dispute_statement: signed_dispute_statement, + validator_index, + }) + } else { + None + }; + + let mut actions = import_checked_approval( state, db, metrics, @@ -2357,6 +2474,9 @@ fn issue_approval( } ).into()); + // dispatch to dispute coordinator. + actions.extend(inform_disputes_action); + Ok(actions) } diff --git a/node/core/approval-voting/src/old_tests.rs b/node/core/approval-voting/src/old_tests.rs index d21bde2dddfe..6a2141dbc31f 100644 --- a/node/core/approval-voting/src/old_tests.rs +++ b/node/core/approval-voting/src/old_tests.rs @@ -17,7 +17,10 @@ use super::*; use super::approval_db::v1::Config; use super::backend::{Backend, BackendWriteOp}; -use polkadot_primitives::v1::{CandidateDescriptor, CoreIndex, GroupIndex, ValidatorSignature}; +use polkadot_primitives::v1::{ + CandidateDescriptor, CoreIndex, GroupIndex, ValidatorSignature, + DisputeStatement, ValidDisputeStatementKind, +}; use polkadot_node_primitives::approval::{ AssignmentCert, AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT, DelayTranche, @@ -814,7 +817,25 @@ fn accepts_and_imports_approval_after_assignment() { assert_eq!(res, ApprovalCheckResult::Accepted); - assert_eq!(actions.len(), 0); + assert_eq!(actions.len(), 1); + + assert_matches!( + &actions[0], + Action::InformDisputeCoordinator { + dispute_statement, + candidate_hash: c_hash, + validator_index: v, + .. + } => { + assert_matches!( + dispute_statement.statement(), + &DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) + ); + + assert_eq!(c_hash, &candidate_hash); + assert_eq!(v, &validator_index); + } + ); let write_ops = overlay_db.into_write_ops().collect::>(); assert_eq!(write_ops.len(), 1); diff --git a/node/core/approval-voting/src/persisted_entries.rs b/node/core/approval-voting/src/persisted_entries.rs index 9032f49995e6..b2ca8a69107a 100644 --- a/node/core/approval-voting/src/persisted_entries.rs +++ b/node/core/approval-voting/src/persisted_entries.rs @@ -273,11 +273,16 @@ impl CandidateEntry { /// Note that a given validator has approved. Return the previous approval state. pub fn mark_approval(&mut self, validator: ValidatorIndex) -> bool { - let prev = self.approvals.get(validator.0 as usize).map(|b| *b).unwrap_or(false); + let prev = self.has_approved(validator); self.approvals.set(validator.0 as usize, true); prev } + /// Query whether a given validator has approved the candidate. + pub fn has_approved(&self, validator: ValidatorIndex) -> bool { + self.approvals.get(validator.0 as usize).map(|b| *b).unwrap_or(false) + } + /// Get the candidate receipt. pub fn candidate_receipt(&self) -> &CandidateReceipt { &self.candidate diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index c1de651e4d9c..c18206ae4d39 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -534,6 +534,7 @@ fn ss_rejects_approval_if_no_block_entry() { validator, candidate_hash, session_index, + false, ).await; assert_matches!( @@ -582,6 +583,7 @@ fn ss_rejects_approval_before_assignment() { validator, candidate_hash, session_index, + false, ).await; assert_matches!( @@ -763,6 +765,7 @@ fn ss_accepts_and_imports_approval_after_assignment() { validator, candidate_hash, session_index, + true, ).await; assert_eq!(rx.await, Ok(ApprovalCheckResult::Accepted)); @@ -820,6 +823,7 @@ async fn cai_approval( validator: ValidatorIndex, candidate_hash: CandidateHash, session_index: SessionIndex, + expect_coordinator: bool, ) -> oneshot::Receiver { let signature = sign_approval(Sr25519Keyring::Alice, candidate_hash, session_index); let (tx, rx) = oneshot::channel(); @@ -837,6 +841,18 @@ async fn cai_approval( ), } ).await; + + if expect_coordinator { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ImportStatements { + pending_confirmation, + .. + }) => { + let _ = pending_confirmation.send(ImportStatementsResult::ValidImport); + } + ); + } rx } diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index fa7b0bb3ee3d..682c45536a27 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -30,9 +30,10 @@ use polkadot_primitives::v1::{ BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, + SessionIndex, }; use polkadot_node_primitives::{ - Statement, SignedFullStatement, ValidationResult, PoV, AvailableData, + Statement, SignedFullStatement, ValidationResult, PoV, AvailableData, SignedDisputeStatement, }; use polkadot_subsystem::{ PerLeafSpan, Stage, SubsystemSender, @@ -42,7 +43,8 @@ use polkadot_subsystem::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest, - StatementDistributionMessage, ValidationFailed + StatementDistributionMessage, ValidationFailed, DisputeCoordinatorMessage, + ImportStatementsResult, } }; use polkadot_node_subsystem_util::{ @@ -151,6 +153,8 @@ impl ValidatedCandidateCommand { pub struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, + /// The session index this corresponds to. + session_index: SessionIndex, /// The `ParaId` assigned to this validator assignment: Option, /// The collator required to author the candidate, if any. @@ -538,6 +542,8 @@ async fn validate_and_make_available( tx_command.send(make_command(res)).await.map_err(Into::into) } +struct ValidatorIndexOutOfBounds; + impl CandidateBackingJob { /// Run asynchronously. async fn run_loop( @@ -768,12 +774,28 @@ impl CandidateBackingJob { "Importing statement", ); + let candidate_hash = statement.payload().candidate_hash(); let import_statement_span = { // create a span only for candidates we're already aware of. - let candidate_hash = statement.payload().candidate_hash(); self.get_unbacked_statement_child(root_span, candidate_hash, statement.validator_index()) }; + if let Err(ValidatorIndexOutOfBounds) = self.dispatch_new_statement_to_dispute_coordinator( + sender, + candidate_hash, + &statement, + ).await { + tracing::warn!( + target: LOG_TARGET, + session_index = ?self.session_index, + relay_parent = ?self.parent, + validator_index = statement.validator_index().0, + "Supposedly 'Signed' statement has validator index out of bounds." + ); + + return Ok(None); + } + let stmt = primitive_statement_to_table(statement); let summary = self.table.import_statement(&self.table_context, stmt); @@ -824,6 +846,86 @@ impl CandidateBackingJob { Ok(summary) } + /// The dispute coordinator keeps track of all statements by validators about every recent + /// candidate. + /// + /// When importing a statement, this should be called access the candidate receipt either + /// from the statement itself or from the underlying statement table in order to craft + /// and dispatch the notification to the dispute coordinator. + /// + /// This also does bounds-checking on the validator index and will return an error if the + /// validator index is out of bounds for the current validator set. It's expected that + /// this should never happen due to the interface of the candidate backing subsystem - + /// the networking component repsonsible for feeding statements to the backing subsystem + /// is meant to check the signature and provenance of all statements before submission. + async fn dispatch_new_statement_to_dispute_coordinator( + &self, + sender: &mut JobSender, + candidate_hash: CandidateHash, + statement: &SignedFullStatement, + ) -> Result<(), ValidatorIndexOutOfBounds> { + // Dispatch the statement to the dispute coordinator. + let validator_index = statement.validator_index(); + let signing_context = SigningContext { + parent_hash: self.parent, + session_index: self.session_index, + }; + + let validator_public = match self.table_context + .validators + .get(validator_index.0 as usize) + { + None => { + return Err(ValidatorIndexOutOfBounds); + } + Some(v) => v, + }; + + let maybe_candidate_receipt = match statement.payload() { + Statement::Seconded(receipt) => Some(receipt.to_plain()), + Statement::Valid(candidate_hash) => { + // Valid statements are only supposed to be imported + // once we've seen at least one `Seconded` statement. + self.table.get_candidate(&candidate_hash).map(|c| c.to_plain()) + } + }; + + let maybe_signed_dispute_statement = SignedDisputeStatement::from_backing_statement( + statement.as_unchecked(), + signing_context, + validator_public.clone(), + ).ok(); + + if let (Some(candidate_receipt), Some(dispute_statement)) + = (maybe_candidate_receipt, maybe_signed_dispute_statement) + { + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + sender.send_message( + DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt, + session: self.session_index, + statements: vec![(dispute_statement, validator_index)], + pending_confirmation, + } + ).await; + + match confirmation_rx.await { + Err(oneshot::Canceled) => tracing::warn!( + target: LOG_TARGET, + "Dispute coordinator confirmation lost", + ), + Ok(ImportStatementsResult::ValidImport) => {} + Ok(ImportStatementsResult::InvalidImport) => tracing::warn!( + target: LOG_TARGET, + "Failed to import statements of validity", + ), + } + } + + Ok(()) + } + async fn process_msg( &mut self, root_span: &jaeger::Span, @@ -1199,6 +1301,7 @@ impl util::JobTrait for CandidateBackingJob { let (background_tx, background_rx) = mpsc::channel(16); let job = CandidateBackingJob { parent, + session_index, assignment, required_collator, issued_statements: HashSet::new(), diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index 8aa4a3a9f3cc..fc62873218ab 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -57,6 +57,12 @@ struct TestState { relay_parent: Hash, } +impl TestState { + fn session(&self) -> SessionIndex { + self.signing_context.session_index + } +} + impl Default for TestState { fn default() -> Self { let chain_a = ParaId::from(1); @@ -259,6 +265,35 @@ async fn test_startup( ); } +async fn test_dispute_coordinator_notifications( + virtual_overseer: &mut VirtualOverseer, + candidate_hash: CandidateHash, + session: SessionIndex, + validator_indices: Vec, +) { + for validator_index in validator_indices { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeCoordinator( + DisputeCoordinatorMessage::ImportStatements { + candidate_hash: c_hash, + candidate_receipt: c_receipt, + session: s, + statements, + pending_confirmation, + } + ) => { + assert_eq!(c_hash, candidate_hash); + assert_eq!(c_receipt.hash(), c_hash); + assert_eq!(s, session); + assert_eq!(statements.len(), 1); + assert_eq!(statements[0].1, validator_index); + let _ = pending_confirmation.send(ImportStatementsResult::ValidImport); + } + ) + } +} + // Test that a `CandidateBackingMessage::Second` issues validation work // and in case validation is successful issues a `StatementDistributionMessage`. #[test] @@ -291,7 +326,6 @@ fn backing_second_works() { virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; - assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -309,7 +343,7 @@ fn backing_second_works() { new_validation_code: None, processed_downward_messages: 0, hrmp_watermark: 0, - }, test_state.validation_data), + }, test_state.validation_data.clone()), )).unwrap(); } ); @@ -323,6 +357,13 @@ fn backing_second_works() { } ); + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(0)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( @@ -404,6 +445,13 @@ fn backing_works() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + // Sending a `Statement::Seconded` for our assignment will start // validation process. The first thing requested is the PoV. assert_matches!( @@ -438,7 +486,7 @@ fn backing_works() { new_validation_code: None, processed_downward_messages: 0, hrmp_watermark: 0, - }, test_state.validation_data), + }, test_state.validation_data.clone()), )).unwrap(); } ); @@ -452,6 +500,13 @@ fn backing_works() { } ); + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(0)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( @@ -468,6 +523,13 @@ fn backing_works() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(5)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::Provisioner( @@ -554,6 +616,13 @@ fn backing_works_while_validation_ongoing() { let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a.hash(), + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + // Sending a `Statement::Seconded` for our assignment will start // validation process. The first thing requested is PoV from the // `PoVDistribution`. @@ -601,6 +670,13 @@ fn backing_works_while_validation_ongoing() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a.hash(), + test_state.session(), + vec![ValidatorIndex(5), ValidatorIndex(3)], + ).await; + // Candidate gets backed entirely by other votes. assert_matches!( virtual_overseer.recv().await, @@ -699,6 +775,13 @@ fn backing_misbehavior_works() { virtual_overseer.send(FromOverseer::Communication { msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityDistribution( @@ -729,7 +812,7 @@ fn backing_misbehavior_works() { new_validation_code: None, processed_downward_messages: 0, hrmp_watermark: 0, - }, test_state.validation_data), + }, test_state.validation_data.clone()), )).unwrap(); } ); @@ -743,6 +826,13 @@ fn backing_misbehavior_works() { } ); + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(0)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( @@ -760,6 +850,13 @@ fn backing_misbehavior_works() { virtual_overseer.send(FromOverseer::Communication { msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::Provisioner( @@ -889,7 +986,7 @@ fn backing_dont_second_invalid() { new_validation_code: None, processed_downward_messages: 0, hrmp_watermark: 0, - }, test_state.validation_data), + }, test_state.validation_data.clone()), )).unwrap(); } ); @@ -903,6 +1000,13 @@ fn backing_dont_second_invalid() { } ); + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_b.hash(), + test_state.session(), + vec![ValidatorIndex(0)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( @@ -965,6 +1069,13 @@ fn backing_second_after_first_fails_works() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + // Subsystem requests PoV and requests validation. assert_matches!( virtual_overseer.recv().await, @@ -1087,6 +1198,13 @@ fn backing_works_after_failed_validation() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + // Subsystem requests PoV and requests validation. assert_matches!( virtual_overseer.recv().await, @@ -1375,6 +1493,13 @@ fn retry_works() { ); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(2)], + ).await; + // Subsystem requests PoV and requests validation. // We cancel - should mean retry on next backing statement. assert_matches!( @@ -1396,12 +1521,39 @@ fn retry_works() { ); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(3)], + ).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityDistribution( + AvailabilityDistributionMessage::FetchPoV { + relay_parent, + tx, + .. + } + ) if relay_parent == test_state.relay_parent => { + std::mem::drop(tx); + } + ); + let statement = CandidateBackingMessage::Statement( test_state.relay_parent, signed_c.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate.hash(), + test_state.session(), + vec![ValidatorIndex(5)], + ).await; + // Not deterministic which message comes first: for _ in 0u32..2 { match virtual_overseer.recv().await { @@ -1417,15 +1569,15 @@ fn retry_works() { assert_eq!(descriptor, candidate.descriptor); } // Subsystem requests PoV and requests validation. - // We cancel once more: + // Now we pass. AllMessages::AvailabilityDistribution( AvailabilityDistributionMessage::FetchPoV { relay_parent, tx, .. } - ) if relay_parent == test_state.relay_parent => { - std::mem::drop(tx); + ) if relay_parent == test_state.relay_parent => { + tx.send(pov.clone()).unwrap(); } msg => { assert!(false, "Unexpected message: {:?}", msg); @@ -1433,21 +1585,6 @@ fn retry_works() { } } - // Subsystem requests PoV and requests validation. - // Now we pass. - assert_matches!( - virtual_overseer.recv().await, - AllMessages::AvailabilityDistribution( - AvailabilityDistributionMessage::FetchPoV { - relay_parent, - tx, - .. - } - ) if relay_parent == test_state.relay_parent => { - tx.send(pov.clone()).unwrap(); - } - ); - assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -1547,6 +1684,13 @@ fn observes_backing_even_if_not_validator() { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + test_dispute_coordinator_notifications( + &mut virtual_overseer, + candidate_a_hash, + test_state.session(), + vec![ValidatorIndex(0), ValidatorIndex(5), ValidatorIndex(2)], + ).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::Provisioner( diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 2253b83c6192..7ec30d51c1ce 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -18,15 +18,15 @@ use polkadot_primitives::v1::{ CandidateReceipt, ValidDisputeStatementKind, InvalidDisputeStatementKind, ValidatorIndex, - ValidatorSignature, SessionIndex, CandidateHash, + ValidatorSignature, SessionIndex, CandidateHash, Hash, }; use kvdb::{KeyValueDB, DBTransaction}; use parity_scale_codec::{Encode, Decode}; -use crate::DISPUTE_WINDOW; +use crate::{DISPUTE_WINDOW, DisputeStatus}; -const ACTIVE_DISPUTES_KEY: &[u8; 15] = b"active-disputes"; +const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session"; const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes"; @@ -102,61 +102,8 @@ impl From for CandidateVotes { } } -/// Meta-key for tracking active disputes. -#[derive(Debug, Default, Clone, Encode, Decode, PartialEq)] -pub struct ActiveDisputes { - /// All disputed candidates, sorted by session index and then by candidate hash. - pub disputed: Vec<(SessionIndex, CandidateHash)>, -} - -impl ActiveDisputes { - /// Whether the set of active disputes contains the given candidate. - pub(crate) fn contains( - &self, - session: SessionIndex, - candidate_hash: CandidateHash, - ) -> bool { - self.disputed.contains(&(session, candidate_hash)) - } - - /// Insert the session and candidate hash from the set of active disputes. - /// Returns 'true' if the entry was not already in the set. - pub(crate) fn insert( - &mut self, - session: SessionIndex, - candidate_hash: CandidateHash, - ) -> bool { - let new_entry = (session, candidate_hash); - - let pos = self.disputed.iter() - .take_while(|&e| &new_entry < e) - .count(); - if self.disputed.get(pos).map_or(false, |&e| new_entry == e) { - false - } else { - self.disputed.insert(pos, new_entry); - true - } - } - - /// Delete the session and candidate hash from the set of active disputes. - /// Returns 'true' if the entry was present. - pub(crate) fn delete( - &mut self, - session: SessionIndex, - candidate_hash: CandidateHash, - ) -> bool { - let new_entry = (session, candidate_hash); - - match self.disputed.iter().position(|e| &new_entry == e) { - None => false, - Some(pos) => { - self.disputed.remove(pos); - true - } - } - } -} +/// The mapping for recent disputes; any which have not yet been pruned for being ancient. +pub type RecentDisputes = std::collections::BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>; /// Errors while accessing things from the DB. #[derive(Debug, thiserror::Error)] @@ -199,19 +146,19 @@ pub(crate) fn load_earliest_session( load_decode(db, config.col_data, EARLIEST_SESSION_KEY) } -/// Load the active disputes, if any. -pub(crate) fn load_active_disputes( +/// Load the recent disputes, if any. +pub(crate) fn load_recent_disputes( db: &dyn KeyValueDB, config: &ColumnConfiguration, -) -> Result> { - load_decode(db, config.col_data, ACTIVE_DISPUTES_KEY) +) -> Result> { + load_decode(db, config.col_data, RECENT_DISPUTES_KEY) } /// An atomic transaction to be commited to the underlying DB. #[derive(Debug, Default, Clone)] pub(crate) struct Transaction { earliest_session: Option, - active_disputes: Option, + recent_disputes: Option, write_candidate_votes: Vec<(SessionIndex, CandidateHash, CandidateVotes)>, delete_candidate_votes: Vec<(SessionIndex, CandidateHash)>, } @@ -224,14 +171,13 @@ impl Transaction { self.earliest_session = Some(session); } - /// Prepare a write to the active disputes stored in the DB. + /// Prepare a write to the recent disputes stored in the DB. /// /// Later calls to this function will override earlier ones. - pub(crate) fn put_active_disputes(&mut self, active: ActiveDisputes) { - self.active_disputes = Some(active); + pub(crate) fn put_recent_disputes(&mut self, recent_disputes: RecentDisputes) { + self.recent_disputes = Some(recent_disputes); } - /// Prepare a write of the candidate votes under the indicated candidate. /// /// Later calls to this function for the same candidate will override earlier ones. @@ -264,8 +210,8 @@ impl Transaction { tx.put_vec(config.col_data, EARLIEST_SESSION_KEY, s.encode()); } - if let Some(a) = self.active_disputes { - tx.put_vec(config.col_data, ACTIVE_DISPUTES_KEY, a.encode()); + if let Some(a) = self.recent_disputes { + tx.put_vec(config.col_data, RECENT_DISPUTES_KEY, a.encode()); } for (session, candidate_hash, votes) in self.write_candidate_votes { @@ -305,16 +251,20 @@ pub(crate) fn note_current_session( // Prune all data in the outdated sessions. tx.put_earliest_session(new_earliest); - // Clear active disputes metadata. + // Clear recent disputes metadata. { - let mut active_disputes = load_active_disputes(store, config)?.unwrap_or_default(); - let prune_up_to = active_disputes.disputed.iter() - .take_while(|s| s.0 < new_earliest) - .count(); - - if prune_up_to > 0 { - let _ = active_disputes.disputed.drain(..prune_up_to); - tx.put_active_disputes(active_disputes); + let mut recent_disputes = load_recent_disputes(store, config)?.unwrap_or_default(); + + let lower_bound = ( + new_earliest, + CandidateHash(Hash::repeat_byte(0x00)), + ); + + let prev_len = recent_disputes.len(); + recent_disputes = recent_disputes.split_off(&lower_bound); + + if recent_disputes.len() != prev_len { + tx.put_recent_disputes(recent_disputes); } } @@ -371,17 +321,13 @@ mod tests { tx.put_earliest_session(0); tx.put_earliest_session(1); - tx.put_active_disputes(ActiveDisputes { - disputed: vec![ - (0, CandidateHash(Hash::repeat_byte(0))), - ], - }); + tx.put_recent_disputes(vec![ + ((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active), + ].into_iter().collect()); - tx.put_active_disputes(ActiveDisputes { - disputed: vec![ - (1, CandidateHash(Hash::repeat_byte(1))), - ], - }); + tx.put_recent_disputes(vec![ + ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), + ].into_iter().collect()); tx.put_candidate_votes( 1, @@ -418,12 +364,10 @@ mod tests { ); assert_eq!( - load_active_disputes(&store, &config).unwrap().unwrap(), - ActiveDisputes { - disputed: vec![ - (1, CandidateHash(Hash::repeat_byte(1))), - ], - }, + load_recent_disputes(&store, &config).unwrap().unwrap(), + vec![ + ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), + ].into_iter().collect() ); assert_eq!( @@ -527,14 +471,12 @@ mod tests { { let mut tx = Transaction::default(); tx.put_earliest_session(prev_earliest_session); - tx.put_active_disputes(ActiveDisputes { - disputed: vec![ - (very_old, hash_a), - (slightly_old, hash_b), - (new_earliest_session, hash_c), - (very_recent, hash_d), - ], - }); + tx.put_recent_disputes(vec![ + ((very_old, hash_a), DisputeStatus::Active), + ((slightly_old, hash_b), DisputeStatus::Active), + ((new_earliest_session, hash_c), DisputeStatus::Active), + ((very_recent, hash_d), DisputeStatus::Active), + ].into_iter().collect()); tx.put_candidate_votes( very_old, @@ -571,10 +513,11 @@ mod tests { ); assert_eq!( - load_active_disputes(&store, &config).unwrap().unwrap(), - ActiveDisputes { - disputed: vec![(new_earliest_session, hash_c), (very_recent, hash_d)], - }, + load_recent_disputes(&store, &config).unwrap().unwrap(), + vec![ + ((new_earliest_session, hash_c), DisputeStatus::Active), + ((very_recent, hash_d), DisputeStatus::Active), + ].into_iter().collect(), ); assert!(load_candidate_votes(&store, &config, very_old, &hash_a).unwrap().is_none()); diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index c7038e424611..c311529acd8b 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -27,6 +27,7 @@ use std::collections::HashSet; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use polkadot_node_primitives::{CandidateVotes, DISPUTE_WINDOW, DisputeMessage, SignedDisputeStatement, DisputeMessageCheckError}; use polkadot_node_subsystem::{ @@ -48,10 +49,10 @@ use polkadot_primitives::v1::{ use futures::prelude::*; use futures::channel::oneshot; use kvdb::KeyValueDB; -use parity_scale_codec::Error as CodecError; +use parity_scale_codec::{Encode, Decode, Error as CodecError}; use sc_keystore::LocalKeystore; -use db::v1::ActiveDisputes; +use db::v1::RecentDisputes; mod db; @@ -60,6 +61,14 @@ mod tests; const LOG_TARGET: &str = "parachain::dispute-coordinator"; +// The choice here is fairly arbitrary. But any dispute that concluded more than a few minutes ago +// is not worth considering anymore. Changing this value has little to no bearing on consensus, +// and really only affects the work that the node might do on startup during periods of many disputes. +const ACTIVE_DURATION_SECS: Timestamp = 180; + +/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. +type Timestamp = u64; + struct State { keystore: Arc, highest_session: Option, @@ -103,7 +112,7 @@ where Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = run(self, ctx) + let future = run(self, ctx, Box::new(SystemClock)) .map(|_| Ok(())) .boxed(); @@ -114,6 +123,35 @@ where } } +trait Clock: Send + Sync { + fn now(&self) -> Timestamp; +} + +struct SystemClock; + +impl Clock for SystemClock { + fn now(&self) -> Timestamp { + // `SystemTime` is notoriously non-monotonic, so our timers might not work + // exactly as expected. + // + // Regardless, disputes are considered active based on an order of minutes, + // so a few seconds of slippage in either direction shouldn't affect the + // amount of work the node is doing significantly. + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(d) => d.as_secs(), + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Current time is before unix epoch. Validation will not work correctly." + ); + + 0 + } + } + } +} + #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] pub enum Error { @@ -160,13 +198,81 @@ impl Error { } } -async fn run(subsystem: DisputeCoordinatorSubsystem, mut ctx: Context) +/// The status of dispute. This is a state machine which can be altered by the +/// helper methods. +#[derive(Debug, Clone, Copy, Encode, Decode, PartialEq)] +pub enum DisputeStatus { + /// The dispute is active and unconcluded. + #[codec(index = 0)] + Active, + /// The dispute has been concluded in favor of the candidate + /// since the given timestamp. + #[codec(index = 1)] + ConcludedFor(Timestamp), + /// The dispute has been concluded agains the candidate + /// since the given timestamp. + /// + /// This takes precedence over `ConcludedFor` in the case that + /// both are true, which is impossible unless a large amount of + /// validators are participating on both sides. + #[codec(index = 2)] + ConcludedAgainst(Timestamp), +} + +impl DisputeStatus { + /// Initialize the status to the active state. + pub fn active() -> DisputeStatus { + DisputeStatus::Active + } + + /// Transition the status to a new status after observing the dispute has concluded for the candidate. + /// This may be a no-op if the status was already concluded. + pub fn concluded_for(self, now: Timestamp) -> DisputeStatus { + match self { + DisputeStatus::Active => DisputeStatus::ConcludedFor(now), + DisputeStatus::ConcludedFor(at) => DisputeStatus::ConcludedFor(std::cmp::min(at, now)), + against => against, + } + } + + /// Transition the status to a new status after observing the dispute has concluded against the candidate. + /// This may be a no-op if the status was already concluded. + pub fn concluded_against(self, now: Timestamp) -> DisputeStatus { + match self { + DisputeStatus::Active => DisputeStatus::ConcludedAgainst(now), + DisputeStatus::ConcludedFor(at) => DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), + DisputeStatus::ConcludedAgainst(at) => DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), + } + } + + /// Whether the disputed candidate is possibly invalid. + pub fn is_possibly_invalid(&self) -> bool { + match self { + DisputeStatus::Active | DisputeStatus::ConcludedAgainst(_) => true, + DisputeStatus::ConcludedFor(_) => false, + } + } + + /// Yields the timestamp this dispute concluded at, if any. + pub fn concluded_at(&self) -> Option { + match self { + DisputeStatus::Active => None, + DisputeStatus::ConcludedFor(at) | DisputeStatus::ConcludedAgainst(at) => Some(*at), + } + } +} + +async fn run( + subsystem: DisputeCoordinatorSubsystem, + mut ctx: Context, + clock: Box, +) where Context: overseer::SubsystemContext, Context: SubsystemContext { loop { - let res = run_iteration(&mut ctx, &subsystem).await; + let res = run_iteration(&mut ctx, &subsystem, &*clock).await; match res { Err(e) => { e.trace(); @@ -188,7 +294,11 @@ where // // A return value of `Ok` indicates that an exit should be made, while non-fatal errors // lead to another call to this function. -async fn run_iteration(ctx: &mut Context, subsystem: &DisputeCoordinatorSubsystem) +async fn run_iteration( + ctx: &mut Context, + subsystem: &DisputeCoordinatorSubsystem, + clock: &dyn Clock, +) -> Result<(), Error> where Context: overseer::SubsystemContext, @@ -223,6 +333,7 @@ where &mut state, config, msg, + clock.now(), ).await? } } @@ -286,8 +397,6 @@ async fn handle_new_activations( } _ => {} } - - // TODO [after https://github.com/paritytech/polkadot/issues/3160]: chain rollbacks } Ok(()) @@ -299,6 +408,7 @@ async fn handle_incoming( state: &mut State, config: &Config, message: DisputeCoordinatorMessage, + now: Timestamp, ) -> Result<(), Error> { match message { DisputeCoordinatorMessage::ImportStatements { @@ -317,15 +427,21 @@ async fn handle_incoming( candidate_receipt, session, statements, + now, pending_confirmation, ).await?; } + DisputeCoordinatorMessage::RecentDisputes(rx) => { + let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? + .unwrap_or_default(); + + let _ = rx.send(recent_disputes.keys().cloned().collect()); + } DisputeCoordinatorMessage::ActiveDisputes(rx) => { - let active_disputes = db::v1::load_active_disputes(store, &config.column_config())? - .map(|d| d.disputed) + let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? .unwrap_or_default(); - let _ = rx.send(active_disputes); + let _ = rx.send(collect_active(recent_disputes, now)); } DisputeCoordinatorMessage::QueryCandidateVotes( session, @@ -356,6 +472,7 @@ async fn handle_incoming( candidate_receipt, session, valid, + now, ).await?; } DisputeCoordinatorMessage::DetermineUndisputedChain { @@ -377,6 +494,15 @@ async fn handle_incoming( Ok(()) } +fn collect_active(recent_disputes: RecentDisputes, now: Timestamp) -> Vec<(SessionIndex, CandidateHash)> { + recent_disputes.iter().filter_map(|(disputed, status)| + status.concluded_at().filter(|at| at + ACTIVE_DURATION_SECS < now).map_or( + Some(*disputed), + |_| None, + ) + ).collect() +} + fn insert_into_statement_vec( vec: &mut Vec<(T, ValidatorIndex, ValidatorSignature)>, tag: T, @@ -400,6 +526,7 @@ async fn handle_import_statements( candidate_receipt: CandidateReceipt, session: SessionIndex, statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, + now: Timestamp, pending_confirmation: oneshot::Sender, ) -> Result<(), Error> { if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) { @@ -440,8 +567,6 @@ async fn handle_import_statements( invalid: Vec::new(), }); - let was_undisputed = votes.valid.is_empty() || votes.invalid.is_empty(); - // Update candidate votes. for (statement, val_index) in statements { if validators.get(val_index.0 as usize) @@ -480,75 +605,81 @@ async fn handle_import_statements( // Check if newly disputed. let is_disputed = !votes.valid.is_empty() && !votes.invalid.is_empty(); - let freshly_disputed = is_disputed && was_undisputed; - let already_disputed = is_disputed && !was_undisputed; let concluded_valid = votes.valid.len() >= supermajority_threshold; + let concluded_invalid = votes.invalid.len() >= supermajority_threshold; + + let mut recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? + .unwrap_or_default(); { // Scope so we will only confirm valid import after the import got actually persisted. let mut tx = db::v1::Transaction::default(); - if freshly_disputed && !concluded_valid { + let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone()); - let (report_availability, receive_availability) = oneshot::channel(); - ctx.send_message(DisputeParticipationMessage::Participate { - candidate_hash, - candidate_receipt, - session, - n_validators: n_validators as u32, - report_availability, - }).await; + let status = if is_disputed { + let status = recent_disputes + .entry((session, candidate_hash)) + .or_insert(DisputeStatus::active()); - if !receive_availability.await.map_err(Error::Oneshot)? { - pending_confirmation.send(ImportStatementsResult::InvalidImport).map_err(|_| Error::OneshotSend)?; - tracing::debug!( - target: LOG_TARGET, - "Recovering availability failed - invalid import." - ); - return Ok(()) + // Note: concluded-invalid overwrites concluded-valid, + // so we do this check first. Dispute state machine is + // non-commutative. + if concluded_valid { + *status = status.concluded_for(now); } - // add to active disputes and begin local participation. - update_active_disputes( - store, - config, - &mut tx, - |active| active.insert(session, candidate_hash), - )?; + if concluded_invalid { + *status = status.concluded_against(now); + } - } + Some(*status) + } else { + None + }; - if concluded_valid && already_disputed { - // remove from active disputes. - update_active_disputes( - store, - config, - &mut tx, - |active| active.delete(session, candidate_hash), - )?; + if status != prev_status { + // Only write when updated. + tx.put_recent_disputes(recent_disputes); + + // This branch is only hit when the candidate is freshly disputed - + // status was previously `None`, and now is not. + if prev_status.is_none() { + // No matter what, if the dispute is new, we participate. + // + // We also block the coordinator while awaiting our determination + // of whether the vote is available. + let (report_availability, receive_availability) = oneshot::channel(); + ctx.send_message(DisputeParticipationMessage::Participate { + candidate_hash, + candidate_receipt, + session, + n_validators: n_validators as u32, + report_availability, + }).await; + + if !receive_availability.await.map_err(Error::Oneshot)? { + // If the data is not available, we disregard the dispute votes. + // This is an indication that the dispute does not correspond to any included + // candidate and that it should be ignored. + // + // We expect that if the candidate is truly disputed that the higher-level network + // code will retry. + pending_confirmation.send(ImportStatementsResult::InvalidImport) + .map_err(|_| Error::OneshotSend)?; + + tracing::debug!( + target: LOG_TARGET, + "Recovering availability failed - invalid import." + ); + return Ok(()) + } + } } tx.put_candidate_votes(session, candidate_hash, votes.into()); tx.write(store, &config.column_config())?; } - pending_confirmation.send(ImportStatementsResult::ValidImport).map_err(|_| Error::OneshotSend)?; - - Ok(()) -} - -fn update_active_disputes( - store: &dyn KeyValueDB, - config: &Config, - tx: &mut db::v1::Transaction, - with_active: impl FnOnce(&mut ActiveDisputes) -> bool, -) -> Result<(), Error> { - let mut active_disputes = db::v1::load_active_disputes(store, &config.column_config())? - .unwrap_or_default(); - - if with_active(&mut active_disputes) { - tx.put_active_disputes(active_disputes); - } - Ok(()) } @@ -561,6 +692,7 @@ async fn issue_local_statement( candidate_receipt: CandidateReceipt, session: SessionIndex, valid: bool, + now: Timestamp, ) -> Result<(), Error> { // Load session info. let info = match state.rolling_session_window.session_info(session) { @@ -658,6 +790,7 @@ async fn issue_local_statement( candidate_receipt, session, statements, + now, pending_confirmation, ).await?; } @@ -729,14 +862,21 @@ fn determine_undisputed_chain( .map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0)); // Fast path for no disputes. - let active_disputes = match db::v1::load_active_disputes(store, &config.column_config())? { + let recent_disputes = match db::v1::load_recent_disputes(store, &config.column_config())? { None => return Ok(last), - Some(a) if a.disputed.is_empty() => return Ok(last), + Some(a) if a.is_empty() => return Ok(last), Some(a) => a, }; + let is_possibly_invalid = |session, candidate_hash| { + recent_disputes.get(&(session, candidate_hash)).map_or( + false, + |status| status.is_possibly_invalid(), + ) + }; + for (i, (_, session, candidates)) in block_descriptions.iter().enumerate() { - if candidates.iter().any(|c| active_disputes.contains(*session, *c)) { + if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) { if i == 0 { return Ok(None); } else { diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index cfa84e3818c1..ed5a7da062a7 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -32,6 +32,8 @@ use futures::{ use parity_scale_codec::Encode; use assert_matches::assert_matches; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; + // sets up a keystore with the given keyring accounts. fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore { let store = LocalKeystore::in_memory(); @@ -52,6 +54,29 @@ fn session_to_hash(session: SessionIndex, extra: impl Encode) -> Hash { type VirtualOverseer = TestSubsystemContextHandle; +#[derive(Clone)] +struct MockClock { + time: Arc, +} + +impl Default for MockClock { + fn default() -> Self { + MockClock { time: Arc::new(AtomicU64::default()) } + } +} + +impl Clock for MockClock { + fn now(&self) -> Timestamp { + self.time.load(AtomicOrdering::SeqCst) + } +} + +impl MockClock { + fn set(&self, to: Timestamp) { + self.time.store(to, AtomicOrdering::SeqCst) + } +} + struct TestState { validators: Vec, validator_public: Vec, @@ -60,6 +85,7 @@ struct TestState { subsystem_keystore: Arc, db: Arc, config: Config, + clock: MockClock, } impl Default for TestState { @@ -99,6 +125,7 @@ impl Default for TestState { subsystem_keystore, db, config, + clock: MockClock::default(), } } } @@ -223,7 +250,7 @@ fn test_harness(test: F) state.subsystem_keystore.clone(), ); - let subsystem_task = run(subsystem, ctx); + let subsystem_task = run(subsystem, ctx, Box::new(state.clock.clone())); let test_task = test(state, ctx_handle); futures::executor::block_on(future::join(subsystem_task, test_task)); @@ -693,14 +720,6 @@ fn supermajority_valid_dispute_may_be_finalized() { { let (tx, rx) = oneshot::channel(); - virtual_overseer.send(FromOverseer::Communication { - msg: DisputeCoordinatorMessage::ActiveDisputes(tx), - }).await; - - assert!(rx.await.unwrap().is_empty()); - - let (tx, rx) = oneshot::channel(); - let block_hash_a = Hash::repeat_byte(0x0a); let block_hash_b = Hash::repeat_byte(0x0b); @@ -735,3 +754,292 @@ fn supermajority_valid_dispute_may_be_finalized() { assert!(virtual_overseer.try_recv().await.is_none()); })); } + +#[test] +fn concluded_supermajority_for_non_active_after_time() { + test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + let session = 1; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold( + test_state.validators.len() + ); + + let valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(0)), + (invalid_vote, ValidatorIndex(1)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + let mut statements = Vec::new(); + for i in (0..supermajority_threshold - 1).map(|i| i + 2) { + let vote = test_state.issue_statement_with_index( + i, + candidate_hash, + session, + true, + ).await; + + statements.push((vote, ValidatorIndex(i as _))); + }; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }).await; + + test_state.clock.set(ACTIVE_DURATION_SECS + 1); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::RecentDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + })); +} + +#[test] +fn concluded_supermajority_against_non_active_after_time() { + test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + let session = 1; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold( + test_state.validators.len() + ); + + let valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(0)), + (invalid_vote, ValidatorIndex(1)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + let mut statements = Vec::new(); + for i in (0..supermajority_threshold - 1).map(|i| i + 2) { + let vote = test_state.issue_statement_with_index( + i, + candidate_hash, + session, + false, + ).await; + + statements.push((vote, ValidatorIndex(i as _))); + }; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements, + pending_confirmation, + }, + }).await; + + test_state.clock.set(ACTIVE_DURATION_SECS + 1); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::RecentDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + })); +} + +#[test] +fn fresh_dispute_ignored_if_unavailable() { + test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + let session = 1; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(0)), + (invalid_vote, ValidatorIndex(1)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(false).unwrap(); + } + ); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::RecentDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + })); +} diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index 85f10c68a74c..e1ca8284239f 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -23,12 +23,13 @@ use bitvec::vec::BitVec; use futures::{ channel::{mpsc, oneshot}, prelude::*, + stream::FuturesOrdered, }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger, messages::{ CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, - ProvisionerMessage, + ProvisionerMessage, DisputeCoordinatorMessage, }, }; use polkadot_node_subsystem_util::{ @@ -37,7 +38,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::v1::{ BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption, - SignedAvailabilityBitfield, ValidatorIndex, + SignedAvailabilityBitfield, ValidatorIndex, MultiDisputeStatementSet, DisputeStatementSet, + DisputeStatement, }; use std::{pin::Pin, collections::BTreeMap, sync::Arc}; use thiserror::Error; @@ -113,6 +115,9 @@ pub enum Error { #[error("failed to get backed candidates")] CanceledBackedCandidates(#[source] oneshot::Canceled), + #[error("failed to get votes on dispute")] + CanceledCandidateVotes(#[source] oneshot::Canceled), + #[error(transparent)] ChainApi(#[from] ChainApiError), @@ -298,10 +303,12 @@ async fn send_inherent_data( from_job, ).await?; + let disputes = select_disputes(from_job).await?; + let inherent_data = ProvisionerInherentData { bitfields, backed_candidates: candidates, - disputes: Vec::new(), // until disputes are implemented. + disputes, }; for return_sender in return_senders { @@ -531,6 +538,85 @@ fn bitfields_indicate_availability( 3 * availability.count_ones() >= 2 * availability.len() } +async fn select_disputes( + sender: &mut impl SubsystemSender, +) -> Result { + let (tx, rx) = oneshot::channel(); + + // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. + // It's heavier than `ActiveDisputes` but ensures that everything from the dispute + // window gets on-chain, unlike `ActiveDisputes`. + // + // This should have no meaningful impact on performance on production networks for + // two reasons: + // 1. In large validator sets, a node might be a block author 1% or less of the time. + // this code-path only triggers in the case of being a block author. + // 2. Disputes are expected to be rare because they come with heavy slashing. + sender.send_message(DisputeCoordinatorMessage::RecentDisputes(tx).into()).await; + + let recent_disputes = match rx.await { + Ok(r) => r, + Err(oneshot::Canceled) => { + tracing::debug!( + target: LOG_TARGET, + "Unable to gather recent disputes - subsystem disconnected?", + ); + + Vec::new() + } + }; + + // Load all votes for all disputes from the coordinator. + let dispute_candidate_votes = { + let mut awaited_votes = FuturesOrdered::new(); + + let n_disputes = recent_disputes.len(); + for (session_index, candidate_hash) in recent_disputes { + let (tx, rx) = oneshot::channel(); + sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes( + session_index, + candidate_hash, + tx, + ).into()).await; + + awaited_votes.push(async move { + rx.await + .map_err(Error::CanceledCandidateVotes) + .map(|maybe_votes| maybe_votes.map(|v| (session_index, candidate_hash, v))) + }); + } + + // Sadly `StreamExt::collect` requires `Default`, so we have to do this more + // manually. + let mut vote_sets = Vec::with_capacity(n_disputes); + while let Some(res) = awaited_votes.next().await { + // sanity check - anything present in recent disputes should have + // candidate votes. but we might race with block import on + // session boundaries. + if let Some(vote_set) = res? { + vote_sets.push(vote_set); + } + } + + vote_sets + }; + + // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. + Ok(dispute_candidate_votes.into_iter().map(|(session_index, candidate_hash, votes)| { + let valid_statements = votes.valid.into_iter() + .map(|(s, i, sig)| (DisputeStatement::Valid(s), i, sig)); + + let invalid_statements = votes.invalid.into_iter() + .map(|(s, i, sig)| (DisputeStatement::Invalid(s), i, sig)); + + DisputeStatementSet { + candidate_hash, + session: session_index, + statements: valid_statements.chain(invalid_statements).collect(), + } + }).collect()) +} + #[derive(Clone)] struct MetricsInner { inherent_data_requests: prometheus::CounterVec, diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index ab30b9557df5..1185d768c1a6 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -837,7 +837,11 @@ where fn handle_external_request(&mut self, request: ExternalRequest) { match request { ExternalRequest::WaitForActivation { hash, response_channel } => { - if self.active_leaves.get(&hash).is_some() { + // We use known leaves here because the `WaitForActivation` message + // is primarily concerned about leaves which subsystems have simply + // not been made aware of yet. Anything in the known leaves set, + // even if stale, has been activated in the past. + if self.known_leaves.peek(&hash).is_some() { // it's fine if the listener is no longer interested let _ = response_channel.send(Ok(())); } else { diff --git a/node/primitives/src/disputes/mod.rs b/node/primitives/src/disputes/mod.rs index faae7f2ffb1c..b302612f00c5 100644 --- a/node/primitives/src/disputes/mod.rs +++ b/node/primitives/src/disputes/mod.rs @@ -21,7 +21,12 @@ use parity_scale_codec::{Decode, Encode}; use sp_application_crypto::AppKey; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError}; -use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, DisputeStatement, InvalidDisputeStatementKind, SessionIndex, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature}; +use polkadot_primitives::v1::{ + CandidateHash, CandidateReceipt, DisputeStatement, InvalidDisputeStatementKind, + SessionIndex, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, + ValidatorSignature, SigningContext, +}; +use super::{UncheckedSignedFullStatement, Statement}; /// `DisputeMessage` and related types. mod message; @@ -146,6 +151,39 @@ impl SignedDisputeStatement { pub fn session_index(&self) -> SessionIndex { self.session_index } + + /// Convert a [`SignedFullStatement`] to a [`SignedDisputeStatement`] + /// + /// As [`SignedFullStatement`] contains only the validator index and + /// not the validator public key, the public key must be passed as well, + /// along with the signing context. + /// + /// This does signature checks again with the data provided. + pub fn from_backing_statement( + backing_statement: &UncheckedSignedFullStatement, + signing_context: SigningContext, + validator_public: ValidatorId, + ) -> Result { + let (statement_kind, candidate_hash) = match backing_statement.unchecked_payload() { + Statement::Seconded(candidate) => ( + ValidDisputeStatementKind::BackingSeconded(signing_context.parent_hash), + candidate.hash(), + ), + Statement::Valid(candidate_hash) => ( + ValidDisputeStatementKind::BackingValid(signing_context.parent_hash), + *candidate_hash, + ), + }; + + let dispute_statement = DisputeStatement::Valid(statement_kind); + Self::new_checked( + dispute_statement, + candidate_hash, + signing_context.session_index, + validator_public, + backing_statement.unchecked_signature().clone(), + ) + } } /// Any invalid vote (currently only explicit). diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 0a89ed6aecc3..90c3d77257e1 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -224,7 +224,12 @@ pub enum DisputeCoordinatorMessage { /// - or the imported statements are backing/approval votes, which are always accepted. pending_confirmation: oneshot::Sender }, + /// Fetch a list of all recent disputes the co-ordinator is aware of. + /// These are disputes which have occurred any time in recent sessions, + /// and which may have already concluded. + RecentDisputes(oneshot::Sender>), /// Fetch a list of all active disputes that the coordinator is aware of. + /// These disputes are either unconcluded or recently concluded. ActiveDisputes(oneshot::Sender>), /// Get candidate votes for a candidate. QueryCandidateVotes(SessionIndex, CandidateHash, oneshot::Sender>), diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index 828affb78627..c8b175035735 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -15,7 +15,7 @@ We use this database to encode the following schema: ```rust ("candidate-votes", SessionIndex, CandidateHash) -> Option -"active-disputes" -> ActiveDisputes +"recent-disputes" -> RecentDisputes "earliest-session" -> Option ``` @@ -32,9 +32,21 @@ struct CandidateVotes { invalid: Vec<(InvalidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>, } -struct ActiveDisputes { +// The status of the dispute. +enum DisputeStatus { + // Dispute is still active. + Active, + // Dispute concluded positive (2/3 supermajority) along with what + // timestamp it concluded at. + ConcludedPositive(Timestamp), + // Dispute concluded negative (2/3 supermajority, takes precedence over + // positive in the case of many double-votes). + ConcludedNegative(Timestamp), +} + +struct RecentDisputes { // sorted by session index and then by candidate hash. - disputed: Vec<(SessionIndex, CandidateHash)>, + disputed: Vec<(SessionIndex, CandidateHash, DisputeStatus)>, } ``` @@ -72,7 +84,7 @@ For each leaf in the leaves update: * Fetch the session index for the child of the block with a [`RuntimeApiMessage::SessionIndexForChild`][RuntimeApiMessage]. * If the session index is higher than `state.highest_session`: * update `state.highest_session` - * remove everything with session index less than `state.highest_session - DISPUTE_WINDOW` from the `"active-disputes"` in the DB. + * remove everything with session index less than `state.highest_session - DISPUTE_WINDOW` from the `"recent-disputes"` in the DB. * Use `iter_with_prefix` to remove everything from `"earliest-session"` up to `state.highest_session - DISPUTE_WINDOW` from the DB under `"candidate-votes"`. * Update `"earliest-session"` to be equal to `state.highest_session - DISPUTE_WINDOW`. * For each new block, explicitly or implicitly, under the new leaf, scan for a dispute digest which indicates a rollback. If a rollback is detected, use the ChainApi subsystem to blacklist the chain. @@ -104,7 +116,7 @@ Do nothing. previously one or both had zero length, the candidate is now freshly disputed. 8. If the candidate is not freshly disputed as determined by 7, continue with - 10. If it is freshly disputed now, load `"active-disputes"` and add the + 10. If it is freshly disputed now, load `"recent-disputes"` and add the candidate hash and session index. Then, if we have local statements with regards to that candidate, also continue with 10. Otherwise proceed with 9. 9. Issue a @@ -114,16 +126,20 @@ Do nothing. 10. Write the `CandidateVotes` to the underyling DB. 11. Send back `ImportStatementsResult::ValidImport`. 12. If the dispute now has supermajority votes in the "valid" direction, - according to the `SessionInfo` of the dispute candidate's session, remove - from `"active-disputes"`. -13. If the dispute now has supermajority votes in the "invalid" direction, there - is no need to do anything explicitly. The actual rollback will be handled - during the active leaves update by observing digests from the runtime. -14. Write `"active-disputes"` + according to the `SessionInfo` of the dispute candidate's session, the + `DisputeStatus` should be set to `ConcludedPositive(now)` unless it was + already `ConcludedNegative`. +13. If the dispute now has supermajority votes in the "invalid" direction, + the `DisputeStatus` should be set to `ConcludedNegative(now)`. If it + was `ConcludedPositive` before, the timestamp `now` should be copied + from the previous status. It will be pruned after some time and all chains + containing the disputed block will be reverted by the runtime and + chain-selection subsystem. +14. Write `"recent-disputes"` ### On `DisputeCoordinatorMessage::ActiveDisputes` -* Load `"active-disputes"` and return the data contained within. +* Load `"recent-disputes"` and filter out any disputes which have been concluded for over 5 minutes. Return the filtered data ### On `DisputeCoordinatorMessage::QueryCandidateVotes` @@ -140,11 +156,11 @@ Do nothing. ### On `DisputeCoordinatorMessage::DetermineUndisputedChain` -* Load `"active-disputes"`. +* Load `"recent-disputes"`. * Deconstruct into parts `{ base_number, block_descriptions, rx }` * Starting from the beginning of `block_descriptions`: - 1. Check the `ActiveDisputes` for a dispute of each candidate in the block description. - 1. If there is a dispute, exit the loop. + 1. Check the `RecentDisputes` for a dispute of each candidate in the block description. + 1. If there is a dispute which is active or concluded negative, exit the loop. * For the highest index `i` reached in the `block_descriptions`, send `(base_number + i + 1, block_hash)` on the channel, unless `i` is 0, in which case `None` should be sent. The `block_hash` is determined by inspecting `block_descriptions[i]`. [DisputeTypes]: ../../types/disputes.md diff --git a/roadmap/implementers-guide/src/node/utility/provisioner.md b/roadmap/implementers-guide/src/node/utility/provisioner.md index 0e8aa059635b..752d360b9a38 100644 --- a/roadmap/implementers-guide/src/node/utility/provisioner.md +++ b/roadmap/implementers-guide/src/node/utility/provisioner.md @@ -75,25 +75,11 @@ The end result of this process is a vector of `BackedCandidate`s, sorted in orde This is the point at which the block author provides further votes to active disputes or initiates new disputes in the runtime state. -We must take care not to overwhelm the "spam slots" of the chain. That is, to avoid too many votes from the same validators being placed into the chain, which would trigger the anti-spam protection functionality of the [disputes module](../../runtime/disputes.md). +The block-authoring logic of the runtime has an extra step between handling the inherent-data and producing the actual inherent call, which we assume performs the work of filtering out disputes which are not relevant to the on-chain state. To select disputes: -- Make a `DisputesInfo` runtime API call and decompose into `{ spam_slots, disputes }`. Bind `disputes` to `onchain_disputes`. -- Issue a `DisputeCoordinatorMessage::ActiveDisputes` message and wait for the response. Assign the value to `offchain_disputes`. -- Make a `CandidatesIncluded` runtime API call for each dispute in `offchain_disputes` and tag each offchain dispute as local if the result for it is `true`. -- Initialize `NewSpamSlots: Map<(SessionIndex, ValidatorIndex), u32>` as an empty map. -- For each dispute in `offchain_disputes`: - 1. Make a `RuntimeApiRequest::SessionInfo` against the parent hash for the session of the dispute. If `None`, continue - this chain is in the past relative to the session the dispute belongs to and we can import it when it reaches that session. - 1. Load the spam slots from `spam_slots` for the given session. If it isn't present, treat as though all zeros. - 1. construct a `DisputeStatementSet` of all offchain votes we are aware of that the onchain doesn't already have a `valid` or `invalid` bit set for, respectively. - 1. If the `onchain_disputes` contains an entry for the dispute, load that. Otherwise, treat as empty. - 1. If the offchain dispute is local or the `DisputeStatementSet` and the onchain dispute together have at least `byzantine_threshold + 1` validators in it, continue to the next offchain dispute. - 1. Otherwise - 1. Filter out all votes from the `DisputeStatementSet` where the amount of spam slots occupied on-chain by the validator, plus the `NewSpamSlots` value, plus 1, is greater than `spam_slots.max_spam_slots`. - 1. After filtering, if either the `valid` or `invalid` lists in the combination of the `DisputeStatementSet` and the onchain dispute is empty, skip this dispute. - 1. Add 1 to the `NewSpamSlots` value for each validator in the `DisputeStatementSet`. -- Construct a `MultiDisputeStatementSet` for each `DisputeStatement` and return that. +- Issue a `DisputeCoordinatorMessage::RecentDisputes` message and wait for the response. This is a set of all disputes in recent sessions which we are aware of. ### Determining Bitfield Availability diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 02dd4b677766..b4f9b9a47a39 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -440,7 +440,11 @@ enum DisputeCoordinatorMessage { /// successfully. pending_confirmation: oneshot::Sender }, + /// Fetch a list of all recent disputes that the co-ordinator is aware of. + /// These are disputes which have occured any time in recent sessions, which may have already concluded. + RecentDisputes(ResponseChannel>), /// Fetch a list of all active disputes that the co-ordinator is aware of. + /// These disputes are either unconcluded or recently concluded. ActiveDisputes(ResponseChannel>), /// Get candidate votes for a candidate. QueryCandidateVotes(SessionIndex, CandidateHash, ResponseChannel>),