Skip to content

Commit

Permalink
change prepare worker to use fork instead of threads (#1685)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcin S <marcin@realemail.net>
  • Loading branch information
jpserrat and mrcnski committed Nov 14, 2023
1 parent 3a87390 commit 54f8428
Show file tree
Hide file tree
Showing 24 changed files with 1,468 additions and 534 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,19 @@ async fn validate_candidate_exhaustive(
},
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),

Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
"ambiguous job death: {err}"
)))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
// In principle if preparation of the `WASM` fails, the current candidate can not be the
// reason for that. So we can't say whether it is invalid or not. In addition, with
Expand Down Expand Up @@ -741,9 +746,9 @@ trait ValidationBackend {
};

// Allow limited retries for each kind of error.
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
let mut num_panic_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
Expand All @@ -752,11 +757,12 @@ trait ValidationBackend {

match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
if num_panic_retries_left > 0 =>
num_panic_retries_left -= 1,
WasmInvalidCandidate::AmbiguousWorkerDeath |
WasmInvalidCandidate::AmbiguousJobDeath(_),
)) if num_death_retries_left > 0 => num_death_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(_)))
if num_job_error_retries_left > 0 =>
num_job_error_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
Expand Down
12 changes: 7 additions & 5 deletions polkadot/node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,13 @@ fn candidate_validation_retry_panic_errors() {

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("foo".into()))),
// Throw an AJD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(
"baz".into(),
))),
// Throw another panic error.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("bar".into()))),
]),
validation_data,
validation_code,
Expand Down Expand Up @@ -1216,7 +1218,7 @@ fn precheck_properly_classifies_outcomes() {

inner(Err(PrepareError::Prevalidation("foo".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Preparation("bar".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::JobError("baz".to_owned())), PreCheckOutcome::Invalid);

inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl TestHost {
where
F: FnOnce(&mut Config),
{
let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths();
let (prepare_worker_path, execute_worker_path) = testing::build_workers_and_get_paths(true);

let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
thiserror = "1.0.31"

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand All @@ -30,7 +31,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
seccompiler = "0.4.0"
thiserror = "1.0.31"

[dev-dependencies]
assert_matches = "1.4.0"
Expand Down
55 changes: 27 additions & 28 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub enum PrepareError {
/// Instantiation of the WASM module instance failed.
#[codec(index = 2)]
RuntimeConstruction(String),
/// An unexpected panic has occurred in the preparation worker.
/// An unexpected error has occurred in the preparation job.
#[codec(index = 3)]
Panic(String),
JobError(String),
/// Failed to prepare the PVF due to the time limit.
#[codec(index = 4)]
TimedOut,
Expand All @@ -48,12 +48,12 @@ pub enum PrepareError {
/// The temporary file for the artifact could not be created at the given cache path. This
/// state is reported by the validation host (not by the worker).
#[codec(index = 6)]
CreateTmpFileErr(String),
CreateTmpFile(String),
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// final destination location. This state is reported by the validation host (not by the
/// worker).
#[codec(index = 7)]
RenameTmpFileErr {
RenameTmpFile {
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
// conversion to `Option<String>`.
Expand All @@ -68,11 +68,14 @@ pub enum PrepareError {
/// reported by the validation host (not by the worker).
#[codec(index = 9)]
ClearWorkerDir(String),
/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
JobDied(String),
#[codec(index = 10)]
/// Some error occurred when interfacing with the kernel.
#[codec(index = 11)]
Kernel(String),
}

/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)`
pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";

impl PrepareError {
/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
Expand All @@ -83,12 +86,15 @@ impl PrepareError {
pub fn is_deterministic(&self) -> bool {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true,
TimedOut |
Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true,
IoErr(_) |
CreateTmpFileErr(_) |
RenameTmpFileErr { .. } |
ClearWorkerDir(_) => false,
JobDied(_) |
CreateTmpFile(_) |
RenameTmpFile { .. } |
ClearWorkerDir(_) |
Kernel(_) => false,
// Can occur due to issues with the PVF, but also due to factors like local load.
TimedOut => false,
// Can occur due to issues with the PVF, but also due to local errors.
RuntimeConstruction(_) => false,
}
Expand All @@ -102,14 +108,16 @@ impl fmt::Display for PrepareError {
Prevalidation(err) => write!(f, "prevalidation: {}", err),
Preparation(err) => write!(f, "preparation: {}", err),
RuntimeConstruction(err) => write!(f, "runtime construction: {}", err),
Panic(err) => write!(f, "panic: {}", err),
JobError(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr { err, src, dest } =>
JobDied(err) => write!(f, "prepare: prepare job died: {}", err),
CreateTmpFile(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFile { err, src, dest } =>
write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err),
OutOfMemory => write!(f, "prepare: out of memory"),
ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err),
Kernel(err) => write!(f, "prepare: error interfacing with the kernel: {}", err),
}
}
}
Expand All @@ -133,9 +141,9 @@ pub enum InternalValidationError {
// conversion to `Option<String>`.
path: Option<String>,
},
/// An error occurred in the CPU time monitor thread. Should be totally unrelated to
/// validation.
CpuTimeMonitorThread(String),
/// Some error occurred when interfacing with the kernel.
Kernel(String),

/// Some non-deterministic preparation error occurred.
NonDeterministicPrepareError(PrepareError),
}
Expand All @@ -158,17 +166,8 @@ impl fmt::Display for InternalValidationError {
"validation: host could not clear the worker cache ({:?}) after a job: {}",
path, err
),
CpuTimeMonitorThread(err) =>
write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
Kernel(err) => write!(f, "validation: error interfacing with the kernel: {}", err),
NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
}
}
}

#[test]
fn pre_encoded_payloads() {
let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode();
let mut oom_payload = oom_enc.len().to_le_bytes().to_vec();
oom_payload.extend(oom_enc);
assert_eq!(oom_payload, OOM_PAYLOAD);
}
51 changes: 45 additions & 6 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub struct Handshake {
pub executor_params: ExecutorParams,
}

/// The response from an execution job on the worker.
/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum Response {
pub enum WorkerResponse {
/// The job completed successfully.
Ok {
/// The result of parachain validation.
Expand All @@ -41,14 +41,38 @@ pub enum Response {
/// The candidate is invalid.
InvalidCandidate(String),
/// The job timed out.
TimedOut,
/// An unexpected panic has occurred in the execution worker.
Panic(String),
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
JobDied(String),
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
JobError(String),

/// Some internal error occurred.
InternalError(InternalValidationError),
}

impl Response {
/// The result of a job on the execution worker.
pub type JobResult = Result<JobResponse, JobError>;

/// The successful response from a job on the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum JobResponse {
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
},
/// The candidate is invalid.
InvalidCandidate(String),
}

impl JobResponse {
/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Expand All @@ -58,3 +82,18 @@ impl Response {
}
}
}

/// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this.
#[derive(thiserror::Error, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
#[error("An unexpected panic has occurred in the execution job: {0}")]
Panic(String),
#[error("Could not spawn the requested thread: {0}")]
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
}
12 changes: 9 additions & 3 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,15 @@ impl fmt::Display for WorkerKind {
}
}

// The worker version must be passed in so that we accurately get the version of the worker, and not
// the version that this crate was compiled with.
pub fn worker_event_loop<F>(
// NOTE: The worker version must be passed in so that we accurately get the version of the worker,
// and not the version that this crate was compiled with.
//
// NOTE: This must not spawn any threads due to safety requirements in `event_loop` and to avoid
// errors in [`security::unshare_user_namespace_and_change_root`].
//
/// Initializes the worker process, then runs the given event loop, which spawns a new job process
/// to securely handle each incoming request.
pub fn run_worker<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/execute-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ license.workspace = true
[dependencies]
cpu-time = "1.0.0"
gum = { package = "tracing-gum", path = "../../../gum" }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["resource", "process"]}
libc = "0.2.139"

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand Down
Loading

0 comments on commit 54f8428

Please sign in to comment.