diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 985cc9d6c541..af46ad55fa04 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -27,7 +27,7 @@ use polkadot_node_primitives::{ approval::{ BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote, }, - SignedDisputeStatement, ValidationResult, + SignedDisputeStatement, ValidationResult, APPROVAL_EXECUTION_TIMEOUT, }; use polkadot_node_subsystem::{ errors::RecoveryError, @@ -2235,6 +2235,7 @@ async fn launch_approval( validation_code, candidate.descriptor.clone(), available_data.pov, + APPROVAL_EXECUTION_TIMEOUT, val_tx, ) .into(), diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index bd24244a01f8..38fab2d791c2 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -32,6 +32,7 @@ use futures::{ use polkadot_node_primitives::{ AvailableData, PoV, SignedDisputeStatement, SignedFullStatement, Statement, ValidationResult, + BACKING_EXECUTION_TIMEOUT, }; use polkadot_node_subsystem_util::{ self as util, @@ -380,7 +381,12 @@ async fn request_candidate_validation( let (tx, rx) = oneshot::channel(); sender - .send_message(CandidateValidationMessage::ValidateFromChainState(candidate, pov, tx)) + .send_message(CandidateValidationMessage::ValidateFromChainState( + candidate, + pov, + BACKING_EXECUTION_TIMEOUT, + tx, + )) .await; match rx.await { diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index 68c1e30dfd31..a262dd45d470 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -317,9 +317,10 @@ fn backing_second_works() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate.descriptor() => { + ) if pov == pov && &c == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -476,9 +477,10 @@ fn backing_works() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate_a.descriptor() => { + ) if pov == pov && &c == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -669,9 +671,10 @@ fn backing_works_while_validation_ongoing() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate_a.descriptor() => { + ) if pov == pov && &c == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { // we never validate the candidate. our local node // shouldn't issue any statements. std::mem::forget(tx); @@ -834,9 +837,10 @@ fn backing_misbehavior_works() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate_a.descriptor() => { + ) if pov == pov && &c == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -980,9 +984,10 @@ fn backing_dont_second_invalid() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate_a.descriptor() => { + ) if pov == pov && &c == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); } ); @@ -1008,9 +1013,10 @@ fn backing_dont_second_invalid() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate_b.descriptor() => { + ) if pov == pov && &c == candidate_b.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -1138,9 +1144,10 @@ fn backing_second_after_first_fails_works() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate.descriptor() => { + ) if pov == pov && &c == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); } ); @@ -1186,6 +1193,7 @@ fn backing_second_after_first_fails_works() { _, pov, _, + _, ) ) => { assert_eq!(&*pov, &pov_to_second); @@ -1270,9 +1278,10 @@ fn backing_works_after_failed_validation() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, tx, ) - ) if pov == pov && &c == candidate.descriptor() => { + ) if pov == pov && &c == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { tx.send(Err(ValidationFailed("Internal test error".into()))).unwrap(); } ); @@ -1646,9 +1655,10 @@ fn retry_works() { CandidateValidationMessage::ValidateFromChainState( c, pov, + timeout, _tx, ) - ) if pov == pov && &c == candidate.descriptor() + ) if pov == pov && &c == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT ); virtual_overseer }); diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 9d2852efcc59..c9e78db77c47 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -48,7 +48,7 @@ use parity_scale_codec::Encode; use futures::{channel::oneshot, prelude::*}; -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use async_trait::async_trait; @@ -135,6 +135,7 @@ where CandidateValidationMessage::ValidateFromChainState( descriptor, pov, + timeout, response_sender, ) => { let bg = { @@ -149,6 +150,7 @@ where validation_host, descriptor, pov, + timeout, &metrics, ) .await; @@ -165,6 +167,7 @@ where validation_code, descriptor, pov, + timeout, response_sender, ) => { let bg = { @@ -179,6 +182,7 @@ where validation_code, descriptor, pov, + timeout, &metrics, ) .await; @@ -322,6 +326,7 @@ async fn validate_from_chain_state( validation_host: ValidationHost, descriptor: CandidateDescriptor, pov: Arc, + timeout: Duration, metrics: &Metrics, ) -> Result where @@ -347,6 +352,7 @@ where validation_code, descriptor.clone(), pov, + timeout, metrics, ) .await; @@ -377,6 +383,7 @@ async fn validate_candidate_exhaustive( validation_code: ValidationCode, descriptor: CandidateDescriptor, pov: Arc, + timeout: Duration, metrics: &Metrics, ) -> Result { let _timer = metrics.time_validate_candidate_exhaustive(); @@ -430,7 +437,7 @@ async fn validate_candidate_exhaustive( }; let result = validation_backend - .validate_candidate(raw_validation_code.to_vec(), params) + .validate_candidate(raw_validation_code.to_vec(), timeout, params) .await; if let Err(ref e) = result { @@ -475,6 +482,7 @@ trait ValidationBackend { async fn validate_candidate( &mut self, raw_validation_code: Vec, + timeout: Duration, params: ValidationParams, ) -> Result; } @@ -484,12 +492,14 @@ impl ValidationBackend for ValidationHost { async fn validate_candidate( &mut self, raw_validation_code: Vec, + timeout: Duration, params: ValidationParams, ) -> Result { let (tx, rx) = oneshot::channel(); if let Err(err) = self .execute_pvf( Pvf::from_code(raw_validation_code), + timeout, params.encode(), polkadot_node_core_pvf::Priority::Normal, tx, diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index f067ead6cd3c..30fa96ccc398 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -341,6 +341,7 @@ impl ValidationBackend for MockValidatorBackend { async fn validate_candidate( &mut self, _raw_validation_code: Vec, + _timeout: Duration, _params: ValidationParams, ) -> Result { self.result.clone() @@ -384,6 +385,7 @@ fn candidate_validation_ok_is_ok() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )) .unwrap(); @@ -426,6 +428,7 @@ fn candidate_validation_bad_return_is_invalid() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )) .unwrap(); @@ -461,6 +464,7 @@ fn candidate_validation_timeout_is_internal_error() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )); @@ -495,6 +499,7 @@ fn candidate_validation_code_mismatch_is_invalid() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )) .unwrap(); @@ -534,6 +539,7 @@ fn compressed_code_works() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )); @@ -573,6 +579,7 @@ fn code_decompression_failure_is_invalid() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )); @@ -613,6 +620,7 @@ fn pov_decompression_failure_is_invalid() { validation_code, descriptor, Arc::new(pov), + Duration::from_secs(0), &Default::default(), )); diff --git a/node/core/dispute-participation/src/lib.rs b/node/core/dispute-participation/src/lib.rs index 85b31f0a86f4..21258ad5be11 100644 --- a/node/core/dispute-participation/src/lib.rs +++ b/node/core/dispute-participation/src/lib.rs @@ -22,7 +22,7 @@ use futures::{channel::oneshot, prelude::*}; -use polkadot_node_primitives::ValidationResult; +use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT}; use polkadot_node_subsystem::{ errors::{RecoveryError, RuntimeApiError}, messages::{ @@ -269,11 +269,16 @@ async fn participate( // we issue a request to validate the candidate with the provided exhaustive // parameters + // + // We use the approval execution timeout because this is intended to + // be run outside of backing and therefore should be subject to the + // same level of leeway. ctx.send_message(CandidateValidationMessage::ValidateFromExhaustive( available_data.validation_data, validation_code, candidate_receipt.descriptor.clone(), available_data.pov, + APPROVAL_EXECUTION_TIMEOUT, validation_tx, )) .await; diff --git a/node/core/dispute-participation/src/tests.rs b/node/core/dispute-participation/src/tests.rs index e2c98af53b5e..513f673f81a2 100644 --- a/node/core/dispute-participation/src/tests.rs +++ b/node/core/dispute-participation/src/tests.rs @@ -295,8 +295,8 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, tx) - ) => { + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap(); }, "overseer did not receive candidate validation message", @@ -331,8 +331,8 @@ fn cast_invalid_vote_if_validation_passes_but_commitments_dont_match() { assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, tx) - ) => { + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { let mut commitments = CandidateCommitments::default(); // this should lead to a commitments hash mismatch commitments.processed_downward_messages = 42; @@ -371,8 +371,8 @@ fn cast_valid_vote_if_validation_passes() { assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, tx) - ) => { + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))).unwrap(); }, "overseer did not receive candidate validation message", @@ -408,8 +408,8 @@ fn failure_to_store_available_data_does_not_preclude_participation() { assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, tx) - ) => { + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { tx.send(Err(ValidationFailed("fail".to_string()))).unwrap(); }, "overseer did not receive candidate validation message", diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 09e848196820..9376d7d76f78 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -38,11 +38,17 @@ slotmap::new_key_type! { struct Worker; } #[derive(Debug)] pub enum ToQueue { - Enqueue { artifact: ArtifactPathId, params: Vec, result_tx: ResultSender }, + Enqueue { + artifact: ArtifactPathId, + execution_timeout: Duration, + params: Vec, + result_tx: ResultSender, + }, } struct ExecuteJob { artifact: ArtifactPathId, + execution_timeout: Duration, params: Vec, result_tx: ResultSender, } @@ -167,14 +173,14 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { } fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { - let ToQueue::Enqueue { artifact, params, result_tx } = to_queue; + let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue; tracing::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); queue.metrics.execute_enqueued(); - let job = ExecuteJob { artifact, params, result_tx }; + let job = ExecuteJob { artifact, execution_timeout, params, result_tx }; if let Some(available) = queue.workers.find_available() { assign(queue, available, job); @@ -326,7 +332,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { queue.mux.push( async move { let _timer = execution_timer; - let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await; + let outcome = super::worker::start_work( + idle, + job.artifact.clone(), + job.execution_timeout, + job.params, + ) + .await; QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) } .boxed(), diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 97fe5aec3dbf..86e892672d9e 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -34,8 +34,6 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::time::{Duration, Instant}; -const EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); - /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// /// The program should be able to handle ` execute-worker ` invocation. @@ -69,6 +67,7 @@ pub enum Outcome { pub async fn start_work( worker: IdleWorker, artifact: ArtifactPathId, + execution_timeout: Duration, validation_params: Vec, ) -> Outcome { let IdleWorker { mut stream, pid } = worker; @@ -108,7 +107,7 @@ pub async fn start_work( Ok(response) => response, } }, - _ = Delay::new(EXECUTION_TIMEOUT).fuse() => { + _ = Delay::new(execution_timeout).fuse() => { tracing::warn!( target: LOG_TARGET, worker_pid = %pid, diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 89b230bc90d7..40c30ca65c21 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -48,8 +48,8 @@ pub struct ValidationHost { } impl ValidationHost { - /// Execute PVF with the given code, parameters and priority. The result of execution will be sent - /// to the provided result sender. + /// Execute PVF with the given code, execution timeout, parameters and priority. + /// The result of execution will be sent to the provided result sender. /// /// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of /// situations this function should return immediately. @@ -58,12 +58,13 @@ impl ValidationHost { pub async fn execute_pvf( &mut self, pvf: Pvf, + execution_timeout: Duration, params: Vec, priority: Priority, result_tx: ResultSender, ) -> Result<(), String> { self.to_host_tx - .send(ToHost::ExecutePvf { pvf, params, priority, result_tx }) + .send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx }) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -83,8 +84,16 @@ impl ValidationHost { } enum ToHost { - ExecutePvf { pvf: Pvf, params: Vec, priority: Priority, result_tx: ResultSender }, - HeadsUp { active_pvfs: Vec }, + ExecutePvf { + pvf: Pvf, + execution_timeout: Duration, + params: Vec, + priority: Priority, + result_tx: ResultSender, + }, + HeadsUp { + active_pvfs: Vec, + }, } /// Configuration for the validation host. @@ -200,6 +209,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future, result_tx: ResultSender, } @@ -210,11 +220,18 @@ struct PendingExecutionRequest { struct AwaitingPrepare(HashMap>); impl AwaitingPrepare { - fn add(&mut self, artifact_id: ArtifactId, params: Vec, result_tx: ResultSender) { - self.0 - .entry(artifact_id) - .or_default() - .push(PendingExecutionRequest { params, result_tx }); + fn add( + &mut self, + artifact_id: ArtifactId, + execution_timeout: Duration, + params: Vec, + result_tx: ResultSender, + ) { + self.0.entry(artifact_id).or_default().push(PendingExecutionRequest { + execution_timeout, + params, + result_tx, + }); } fn take(&mut self, artifact_id: &ArtifactId) -> Vec { @@ -360,7 +377,7 @@ async fn handle_to_host( to_host: ToHost, ) -> Result<(), Fatal> { match to_host { - ToHost::ExecutePvf { pvf, params, priority, result_tx } => { + ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => { handle_execute_pvf( cache_path, artifacts, @@ -368,6 +385,7 @@ async fn handle_to_host( execute_queue, awaiting_prepare, pvf, + execution_timeout, params, priority, result_tx, @@ -389,6 +407,7 @@ async fn handle_execute_pvf( execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, pvf: Pvf, + execution_timeout: Duration, params: Vec, priority: Priority, result_tx: ResultSender, @@ -404,6 +423,7 @@ async fn handle_execute_pvf( execute_queue, execute::ToQueue::Enqueue { artifact: ArtifactPathId::new(artifact_id, cache_path), + execution_timeout, params, result_tx, }, @@ -417,7 +437,7 @@ async fn handle_execute_pvf( ) .await?; - awaiting_prepare.add(artifact_id, params, result_tx); + awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, } } else { @@ -426,7 +446,7 @@ async fn handle_execute_pvf( artifacts.insert_preparing(artifact_id.clone()); send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; - awaiting_prepare.add(artifact_id, params, result_tx); + awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } return Ok(()) @@ -499,7 +519,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { params, result_tx } in pending_requests { + for PendingExecutionRequest { execution_timeout, params, result_tx } in pending_requests { if result_tx.is_canceled() { // Preparation could've taken quite a bit of time and the requester may be not interested // in execution anymore, in which case we just skip the request. @@ -510,6 +530,7 @@ async fn handle_prepare_done( execute_queue, execute::ToQueue::Enqueue { artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), + execution_timeout, params, result_tx, }, @@ -597,6 +618,8 @@ mod tests { use assert_matches::assert_matches; use futures::future::BoxFuture; + const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); + #[async_std::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); @@ -840,9 +863,15 @@ mod tests { .await; let (result_tx, _result_rx) = oneshot::channel(); - host.execute_pvf(Pvf::from_discriminator(1), vec![], Priority::Critical, result_tx) - .await - .unwrap(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + vec![], + Priority::Critical, + result_tx, + ) + .await + .unwrap(); run_until( &mut test.run, @@ -862,13 +891,20 @@ mod tests { let mut host = test.host_handle(); let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); - host.execute_pvf(Pvf::from_discriminator(1), b"pvf1".to_vec(), Priority::Normal, result_tx) - .await - .unwrap(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf1".to_vec(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); let (result_tx, result_rx_pvf_1_2) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Critical, result_tx, @@ -877,9 +913,15 @@ mod tests { .unwrap(); let (result_tx, result_rx_pvf_2) = oneshot::channel(); - host.execute_pvf(Pvf::from_discriminator(2), b"pvf2".to_vec(), Priority::Normal, result_tx) - .await - .unwrap(); + host.execute_pvf( + Pvf::from_discriminator(2), + TEST_EXECUTION_TIMEOUT, + b"pvf2".to_vec(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); assert_matches!( test.poll_and_recv_to_prepare_queue().await, @@ -947,9 +989,15 @@ mod tests { let mut host = test.host_handle(); let (result_tx, result_rx) = oneshot::channel(); - host.execute_pvf(Pvf::from_discriminator(1), b"pvf1".to_vec(), Priority::Normal, result_tx) - .await - .unwrap(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf1".to_vec(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); assert_matches!( test.poll_and_recv_to_prepare_queue().await, diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index 3689217880ef..e8fd7b665aa3 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -20,11 +20,13 @@ use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; +use std::time::Duration; mod adder; mod worker_common; const PUPPET_EXE: &str = env!("CARGO_BIN_EXE_puppet_worker"); +const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); struct TestHost { _cache_dir: tempfile::TempDir, @@ -64,6 +66,7 @@ impl TestHost { .await .execute_pvf( Pvf::from_code(code.into()), + TEST_EXECUTION_TIMEOUT, params.encode(), polkadot_node_core_pvf::Priority::Normal, result_tx, diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 0ff8201594fb..41ec66642976 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -75,6 +75,7 @@ impl Subsystem1 { let msg = CandidateValidationMessage::ValidateFromChainState( Default::default(), PoV { block_data: BlockData(Vec::new()) }.into(), + Default::default(), tx, ); ctx.send_message(::AllMessages::from(msg)) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 39eb91e0f6d6..7fe1ed701a83 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -112,6 +112,7 @@ where ctx.send_message(CandidateValidationMessage::ValidateFromChainState( Default::default(), PoV { block_data: BlockData(Vec::new()) }.into(), + Default::default(), tx, )) .await; @@ -791,7 +792,12 @@ where fn test_candidate_validation_msg() -> CandidateValidationMessage { let (sender, _) = oneshot::channel(); let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) }); - CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender) + CandidateValidationMessage::ValidateFromChainState( + Default::default(), + pov, + Default::default(), + sender, + ) } fn test_candidate_backing_msg() -> CandidateBackingMessage { diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 6e8490b67b91..adc8846f4298 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -22,7 +22,7 @@ #![deny(missing_docs)] -use std::{convert::TryFrom, pin::Pin}; +use std::{convert::TryFrom, pin::Pin, time::Duration}; use bounded_vec::BoundedVec; use futures::Future; @@ -71,6 +71,17 @@ pub const POV_BOMB_LIMIT: usize = (MAX_POV_SIZE * 4u32) as usize; /// Number of sessions we want to consider in disputes. pub const DISPUTE_WINDOW: SessionIndex = 6; +/// The amount of time to spend on execution during backing. +pub const BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2); + +/// The amount of time to spend on execution during approval or disputes. +/// +/// This is deliberately much longer than the backing execution timeout to +/// ensure that in the absence of extremely large disparities between hardware, +/// blocks that pass backing are considerd executable by approval checkers or +/// dispute participants. +pub const APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6); + /// The cumulative weight of a block in a fork-choice rule. pub type BlockWeight = u32; diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index c6fb92736c51..ea70d3b2707a 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -51,6 +51,7 @@ use polkadot_statement_table::v1::Misbehavior; use std::{ collections::{BTreeMap, HashSet}, sync::Arc, + time::Duration, }; /// Network events as transmitted to other subsystems, wrapped in their message types. @@ -114,6 +115,8 @@ pub enum CandidateValidationMessage { ValidateFromChainState( CandidateDescriptor, Arc, + /// Execution timeout + Duration, oneshot::Sender>, ), /// Validate a candidate with provided, exhaustive parameters for validation. @@ -130,6 +133,8 @@ pub enum CandidateValidationMessage { ValidationCode, CandidateDescriptor, Arc, + /// Execution timeout + Duration, oneshot::Sender>, ), } @@ -138,8 +143,8 @@ impl CandidateValidationMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { match self { - Self::ValidateFromChainState(_, _, _) => None, - Self::ValidateFromExhaustive(_, _, _, _, _) => None, + Self::ValidateFromChainState(_, _, _, _) => None, + Self::ValidateFromExhaustive(_, _, _, _, _, _) => None, } } } diff --git a/roadmap/implementers-guide/src/node/approval/approval-voting.md b/roadmap/implementers-guide/src/node/approval/approval-voting.md index c6367c050a04..adb95e1f6389 100644 --- a/roadmap/implementers-guide/src/node/approval/approval-voting.md +++ b/roadmap/implementers-guide/src/node/approval/approval-voting.md @@ -281,7 +281,7 @@ On receiving an `ApprovedAncestor(Hash, BlockNumber, response_channel)`: * Load the historical validation code of the parachain by dispatching a `RuntimeApiRequest::ValidationCodeByHash(descriptor.validation_code_hash)` against the state of `block_hash`. * Spawn a background task with a clone of `background_tx` * Wait for the available data - * Issue a `CandidateValidationMessage::ValidateFromExhaustive` message + * Issue a `CandidateValidationMessage::ValidateFromExhaustive` message with `APPROVAL_EXECUTION_TIMEOUT` as the timeout parameter. * Wait for the result of validation * Check that the result of validation, if valid, matches the commitments in the receipt. * If valid, issue a message on `background_tx` detailing the request. diff --git a/roadmap/implementers-guide/src/node/backing/candidate-backing.md b/roadmap/implementers-guide/src/node/backing/candidate-backing.md index c39ffabdcd98..4b25a89cb1a5 100644 --- a/roadmap/implementers-guide/src/node/backing/candidate-backing.md +++ b/roadmap/implementers-guide/src/node/backing/candidate-backing.md @@ -123,7 +123,7 @@ Dispatch a [`AvailabilityDistributionMessage`][ADM]`::FetchPoV{ validator_index, ### Validate PoV Block Create a `(sender, receiver)` pair. -Dispatch a `CandidateValidationMessage::Validate(validation function, candidate, pov, sender)` and listen on the receiver for a response. +Dispatch a `CandidateValidationMessage::Validate(validation function, candidate, pov, BACKING_EXECUTION_TIMEOUT, sender)` and listen on the receiver for a response. ### Distribute Signed Statement diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-participation.md b/roadmap/implementers-guide/src/node/disputes/dispute-participation.md index b3e1c11fa2be..fc0517fa4e1e 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-participation.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-participation.md @@ -48,7 +48,7 @@ Conclude. * If the data is recovered, dispatch a [`RuntimeApiMessage::ValidationCodeByHash`][RuntimeApiMessage] with the parameters `(candidate_receipt.descriptor.validation_code_hash)` at `state.recent_block.hash`. * Dispatch a [`AvailabilityStoreMessage::StoreAvailableData`][AvailabilityStoreMessage] with the data. * If the code is not fetched from the chain, return. This should be impossible with correct relay chain configuration, at least if chain synchronization is working correctly. -* Dispatch a [`CandidateValidationMessage::ValidateFromExhaustive`][CandidateValidationMessage] with the available data and the validation code. +* Dispatch a [`CandidateValidationMessage::ValidateFromExhaustive`][CandidateValidationMessage] with the available data and the validation code and `APPROVAL_EXECUTION_TIMEOUT` as the timeout parameter. * If the validation result is `Invalid`, [cast invalid votes](#cast-votes) and return. * If the validation fails, [cast invalid votes](#cast-votes) and return. * If the validation succeeds, compute the `CandidateCommitments` based on the validation result and compare against the candidate receipt's `commitments_hash`. If they match, [cast valid votes](#cast-votes) and if not, [cast invalid votes](#cast-votes). diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 78d536f1a21c..f3195e713399 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -785,6 +785,9 @@ enum ValidationResult { Invalid, } +const BACKING_EXECUTION_TIMEOUT: Duration = 2 seconds; +const APPROVAL_EXECUTION_TIMEOUT: Duration = 6 seconds; + /// Messages received by the Validation subsystem. /// /// ## Validation Requests @@ -807,6 +810,7 @@ pub enum CandidateValidationMessage { ValidateFromChainState( CandidateDescriptor, Arc, + Duration, // Execution timeout. oneshot::Sender>, ), /// Validate a candidate with provided, exhaustive parameters for validation. @@ -823,6 +827,7 @@ pub enum CandidateValidationMessage { ValidationCode, CandidateDescriptor, Arc, + Duration, // Execution timeout. oneshot::Sender>, ), }