Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Let the PVF host kill the worker on timeout #6381

Merged
merged 5 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum Outcome {

/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
///
/// NOTE: Returning the `HardTimeout` or `IoErr` errors will trigger the child process being killed.
pub async fn start_work(
worker: IdleWorker,
artifact: ArtifactPathId,
Expand Down Expand Up @@ -148,7 +150,7 @@ pub async fn start_work(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded alloted time for execution",
"execution worker exceeded allotted time for execution",
);
// TODO: This case is not really a hard timeout as the timeout here in the host is
// lenient. Should fix this as part of
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async fn run(
from_prepare_queue = from_prepare_queue_rx.next() => {
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));

// Note that preparation always succeeds.
// Note that the preparation outcome is always reported as concluded.
//
// That's because the error conditions are written into the artifact and will be
// reported at the time of the execution. It potentially, but not necessarily, can
Expand Down
5 changes: 4 additions & 1 deletion node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,15 @@ fn handle_mux(
Ok(())
},
PoolEvent::StartWork(worker, outcome) => {
// If we receive any outcome other than `Concluded`, we attempt to kill the worker
// process.
match outcome {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
// relevant.
// relevant. We already send `Rip` when purging if we detect that the
// worker is dead.
return Ok(())
},
Some(data) => data,
Expand Down
10 changes: 7 additions & 3 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ enum Selected {

/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
///
/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being
/// killed.
pub async fn start_work(
worker: IdleWorker,
code: Arc<Vec<u8>>,
Expand Down Expand Up @@ -149,6 +152,7 @@ pub async fn start_work(
},
};

// NOTE: A `TimedOut` or `DidNotMakeIt` error triggers the child process being killed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments about the error handling flow. Having too many comments is
not ideal, as they may not hold true in the future if the code changes. But it
is not obvious just from reading this code in isolation what the consequences of
these errors are, and I feel like something as significant as killing the worker
should be called out.

These comments would have helped me understand the error flow earlier, but if it
is too much, let me know.

match selected {
// Timed out on the child. This should already be logged by the child.
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
Expand All @@ -162,6 +166,9 @@ pub async fn start_work(
}

/// Handles the case where we successfully received response bytes on the host from the child.
///
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
/// cleared by `with_tmp_file`.
async fn handle_response_bytes(
response_bytes: Vec<u8>,
pid: u32,
Expand Down Expand Up @@ -201,9 +208,6 @@ async fn handle_response_bytes(
);

// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Selected::Deadline
}

Expand Down
19 changes: 10 additions & 9 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ where
}

/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// from sleeping and then either sleeps for the remaining CPU time, or kills the process if we
/// exceed the CPU timeout.
///
/// NOTE: Killed processes are detected and cleaned up in `purge_dead`.
/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error
/// if we exceed the CPU timeout.
///
/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the
/// background. When it wakes, it will see that the flag has been set and return.
Expand Down Expand Up @@ -233,7 +231,11 @@ pub async fn cpu_time_monitor_loop(
timeout.as_millis(),
);

// Send back a TimedOut error on timeout.
// Send back a `TimedOut` error.
//
// NOTE: This will cause the worker, whether preparation or execution, to be killed by
// the host. We do not kill the process here because it would interfere with the proper
// handling of this error.
let encoded_result = match job_kind {
JobKind::Prepare => {
let result: Result<(), PrepareError> = Err(PrepareError::TimedOut);
Expand All @@ -244,8 +246,8 @@ pub async fn cpu_time_monitor_loop(
result.encode()
},
};
// If we error there is nothing else we can do here, and we are killing the process,
// anyway. The receiving side will just have to time out.
// If we error here there is nothing we can do apart from log it. The receiving side
// will just have to time out.
if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await {
gum::warn!(
target: LOG_TARGET,
Expand All @@ -255,8 +257,7 @@ pub async fn cpu_time_monitor_loop(
);
}

// Kill the process.
std::process::exit(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen in case of the timeout is that you will go onto the next iteration of loop, hit the err branch and return from this function. Is this behaviour desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

return
}

// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
Expand Down