Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
PVF preparation: do not conflate errors (#6384)
Browse files Browse the repository at this point in the history
* PVF preparation: do not conflate errors

+ Adds some more granularity to the prepare errors.
+ Better distinguish whether errors occur on the host side or the worker.
+ Do not kill the worker if the error happened on the host side.
+ Do not retry preparation if the error was `Panic`.
+ Removes unnecessary indirection with `Selected` type.

* Add missing docs, resolve TODOs

* Address review comments and remove TODOs

* Fix error in CI

* Undo unnecessary change

* Update couple of comments

* Don't return error for stream shutdown

* Update node/core/pvf/src/worker_common.rs
  • Loading branch information
mrcnski authored Dec 20, 2022
1 parent 86c134e commit 9cf76e3
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 108 deletions.
17 changes: 9 additions & 8 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,12 @@ where

match validation_backend.precheck_pvf(validation_code).await {
Ok(_) => PreCheckOutcome::Valid,
Err(prepare_err) => match prepare_err {
PrepareError::Prevalidation(_) |
PrepareError::Preparation(_) |
PrepareError::Panic(_) => PreCheckOutcome::Invalid,
PrepareError::TimedOut | PrepareError::DidNotMakeIt => PreCheckOutcome::Failed,
},
Err(prepare_err) =>
if prepare_err.is_deterministic() {
PreCheckOutcome::Invalid
} else {
PreCheckOutcome::Failed
},
}
}

Expand Down Expand Up @@ -667,10 +667,11 @@ impl ValidationBackend for ValidationHost {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await {
return Err(PrepareError::DidNotMakeIt)
// Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr)
}

let precheck_result = rx.await.or(Err(PrepareError::DidNotMakeIt))?;
let precheck_result = rx.await.or(Err(PrepareError::IoErr))?;

precheck_result
}
Expand Down
2 changes: 1 addition & 1 deletion node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() {
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);

inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::DidNotMakeIt), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed);
}
74 changes: 48 additions & 26 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};
use std::{any::Any, time::Duration};
use std::{any::Any, fmt, time::Duration};

/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// successful
Expand All @@ -32,9 +32,46 @@ pub enum PrepareError {
Panic(String),
/// Failed to prepare the PVF due to the time limit.
TimedOut,
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidNotMakeIt,
/// An IO error occurred while receiving the result from the worker process. This state is reported by the
/// validation host (not by the worker).
IoErr,
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
/// validation host (not by the worker).
CreateTmpFileErr(String),
/// The response from the worker is received, but the file cannot be renamed (moved) to the final destination
/// location. This state is reported by the validation host (not by the worker).
RenameTmpFileErr(String),
}

impl PrepareError {
/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
///
/// Non-deterministic errors can happen spuriously. Typically, they occur due to resource
/// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient
/// but may persist e.g. if the node is run by overwhelmingly underpowered machine.
pub fn is_deterministic(&self) -> bool {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) => true,
TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
}
}
}

impl fmt::Display for PrepareError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use PrepareError::*;
match self {
Prevalidation(err) => write!(f, "prevalidation: {}", err),
Preparation(err) => write!(f, "preparation: {}", err),
Panic(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr => write!(f, "prepare: io error while receiving response"),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
}
}
}

/// A error raised during validation of the candidate.
Expand Down Expand Up @@ -81,32 +118,17 @@ pub enum InvalidCandidate {
impl From<PrepareError> for ValidationError {
fn from(error: PrepareError) -> Self {
// Here we need to classify the errors into two errors: deterministic and non-deterministic.
// See [`PrepareError::is_deterministic`].
//
// Non-deterministic errors can happen spuriously. Typically, they occur due to resource
// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient
// but may persist e.g. if the node is run by overwhelmingly underpowered machine.
//
// Deterministic errors should trigger reliably. Those errors depend on the PVF itself and
// the sc-executor/wasmtime logic.
//
// For now, at least until the PVF pre-checking lands, the deterministic errors will be
// treated as `InvalidCandidate`. Should those occur they could potentially trigger disputes.
// We treat the deterministic errors as `InvalidCandidate`. Should those occur they could
// potentially trigger disputes.
//
// All non-deterministic errors are qualified as `InternalError`s and will not trigger
// disputes.
match error {
PrepareError::Prevalidation(err) => ValidationError::InvalidCandidate(
InvalidCandidate::PrepareError(format!("prevalidation: {}", err)),
),
PrepareError::Preparation(err) => ValidationError::InvalidCandidate(
InvalidCandidate::PrepareError(format!("preparation: {}", err)),
),
PrepareError::Panic(err) => ValidationError::InvalidCandidate(
InvalidCandidate::PrepareError(format!("panic: {}", err)),
),
PrepareError::TimedOut => ValidationError::InternalError("prepare: timeout".to_owned()),
PrepareError::DidNotMakeIt =>
ValidationError::InternalError("prepare: did not make it".to_owned()),
if error.is_deterministic() {
ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(error.to_string()))
} else {
ValidationError::InternalError(error.to_string())
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,16 +776,15 @@ fn can_retry_prepare_after_failure(
num_failures: u32,
error: &PrepareError,
) -> bool {
use PrepareError::*;
match error {
// Gracefully returned an error, so it will probably be reproducible. Don't retry.
Prevalidation(_) | Preparation(_) => false,
// Retry if the retry cooldown has elapsed and if we have already retried less than
// `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves.
Panic(_) | TimedOut | DidNotMakeIt =>
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES,
if error.is_deterministic() {
// This error is considered deterministic, so it will probably be reproducible. Don't retry.
return false
}

// Retry if the retry cooldown has elapsed and if we have already retried less than `NUM_PREPARE_RETRIES` times. IO
// errors may resolve themselves.
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES
}

/// A stream that yields a pulse continuously at a given interval.
Expand Down
84 changes: 58 additions & 26 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
LOG_TARGET,
};
use always_assert::never;
use assert_matches::assert_matches;
use async_std::path::{Path, PathBuf};
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
Expand Down Expand Up @@ -232,7 +231,7 @@ fn handle_to_pool(
// items concluded;
// thus idle token is Some;
// qed.
never!("unexpected abscence of the idle token in prepare pool");
never!("unexpected absence of the idle token in prepare pool");
}
} else {
// That's a relatively normal situation since the queue may send `start_work` and
Expand Down Expand Up @@ -294,44 +293,43 @@ fn handle_mux(
Ok(())
},
PoolEvent::StartWork(worker, outcome) => {
// If we receive any outcome other than `Concluded`, we attempt to kill the worker
// process.
// If we receive an outcome that the worker is unreachable or that an error occurred on
// the worker, we attempt to kill the worker process.
match outcome {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
// relevant. We already send `Rip` when purging if we detect that the
// worker is dead.
return Ok(())
},
Some(data) => data,
};

// We just replace the idle worker that was loaned from this option during
// the work starting.
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");

reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;

Ok(())
},
Outcome::Concluded { worker: idle, result } =>
handle_concluded_no_rip(from_pool, spawned, worker, idle, result),
// Return `Concluded`, but do not kill the worker since the error was on the host side.
Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::CreateTmpFileErr(err)),
),
// Return `Concluded`, but do not kill the worker since the error was on the host side.
Outcome::RenameTmpFileErr { worker: idle, result: _, err } =>
handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::RenameTmpFileErr(err)),
),
Outcome::Unreachable => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Rip(worker))?;
}

Ok(())
},
Outcome::DidNotMakeIt => {
Outcome::IoErr => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
result: Err(PrepareError::IoErr),
},
)?;
}
Expand Down Expand Up @@ -380,6 +378,40 @@ fn attempt_retire(
}
}

/// Handles the case where we received a response. There potentially was an error, but not the fault
/// of the worker as far as we know, so the worker should not be killed.
///
/// This function tries to put the idle worker back into the pool and then replies with
/// `FromPool::Concluded` with `rip: false`.
fn handle_concluded_no_rip(
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
worker: Worker,
idle: IdleWorker,
result: PrepareResult,
) -> Result<(), Fatal> {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer relevant. We
// already send `Rip` when purging if we detect that the worker is dead.
return Ok(())
},
Some(data) => data,
};

// We just replace the idle worker that was loaned from this option during
// the work starting.
let old = data.idle.replace(idle);
never!(
old.is_some(),
"old idle worker was taken out when starting work; we only replace it here; qed"
);

reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;

Ok(())
}

/// Spins up the pool and returns the future that should be polled to make the pool functional.
pub fn start(
metrics: Metrics,
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
result: Err(PrepareError::IoErr),
});
test.poll_ensure_to_pool_is_empty().await;
}
Expand Down
Loading

0 comments on commit 9cf76e3

Please sign in to comment.