Skip to content

Commit

Permalink
Revert "Remove potential for panics from spawn APIs (pantsbuild#17974)"
Browse files Browse the repository at this point in the history
This reverts commit a97f9c2.
  • Loading branch information
jsirois committed Jan 16, 2023
1 parent ed08f75 commit e67baf0
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 444 deletions.
10 changes: 1 addition & 9 deletions src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,7 @@ impl PosixFS {
let vfs = self.clone();
self
.executor
.spawn_blocking(
move || vfs.scandir_sync(&dir_relative_to_root),
|e| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Synchronous scandir failed: {e}"),
))
},
)
.spawn_blocking(move || vfs.scandir_sync(&dir_relative_to_root))
.await
}

Expand Down
17 changes: 7 additions & 10 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,16 +1312,13 @@ impl Store {
store
.local
.executor()
.spawn_blocking(
move || {
if is_root {
fs::safe_create_dir_all(&destination2)
} else {
fs::safe_create_dir(&destination2)
}
},
|e| Err(format!("Directory creation task failed: {e}")),
)
.spawn_blocking(move || {
if is_root {
fs::safe_create_dir_all(&destination2)
} else {
fs::safe_create_dir(&destination2)
}
})
.map_err(|e| {
format!(
"Failed to create directory {}: {}",
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ impl<N: Node> Graph<N> {
nodes: HashMap::default(),
pg: DiGraph::new(),
}));
let _join = executor.native_spawn(Self::cycle_check_task(Arc::downgrade(&inner)));
let _join = executor.spawn(Self::cycle_check_task(Arc::downgrade(&inner)));

Graph {
inner,
Expand Down
16 changes: 10 additions & 6 deletions src/rust/engine/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
#![allow(clippy::mutex_atomic)]

///
/// Macro to allow fatal logging to a file which bypasses the standard logging systems.
/// This is useful for code paths which must not interact with stdio or the logging system, and
/// can also be useful for one-off debugging of stdio, logging, or pantsd issues.
/// Macro to allow debug logging to a file which bypasses the standard logging systems.
/// This is useful for one-off debugging, and is code that several developers have found they're
/// writing a lot as one-offs when working in the rust code (particularly when working on logging),
/// so this is a useful macro to exist for one-off use.
///
/// This should not be used for actual production logging; use the log crate's macros
/// (info!, debug!, trace!) for that.
///
#[macro_export]
macro_rules! fatal_log {
($($arg:tt)+) => {
macro_rules! debug_log {
($path:expr, $($arg:tt)+) => {
{
use ::std::io::Write;
let mut f = ::std::fs::OpenOptions::new().create(true).append(true).open("fatal.log").unwrap();
let mut f = ::std::fs::OpenOptions::new().create(true).append(true).open($path).unwrap();
writeln!(f, $($arg)+).unwrap()
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/logging/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Log for PantsLogger {
Err(e) => {
// If we've failed to write to stdio, but also to our log file, our only recourse is to
// try to write to a different file.
fatal_log!("Failed to write to log file {:?}: {}", file, e);
debug_log!("fatal.log", "Failed to write to log file {:?}: {}", file, e);
}
}
}
Expand Down
32 changes: 13 additions & 19 deletions src/rust/engine/nailgun/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Server {
let (exited_sender, exited_receiver) = oneshot::channel();
let (exit_sender, exit_receiver) = oneshot::channel();

let _join = executor.native_spawn(Self::serve(
let _join = executor.spawn(Self::serve(
executor.clone(),
config,
nail,
Expand Down Expand Up @@ -125,7 +125,7 @@ impl Server {
// acquired it. Unfortunately we cannot acquire the lock in this thread and then send the
// guard to the other thread due to its lifetime bounds.
let connection_started = Arc::new(Notify::new());
let _join = executor.native_spawn({
let _join = executor.spawn({
let config = config.clone();
let nail = nail.clone();
let connection_started = connection_started.clone();
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Nail for RawFdNail {
let maybe_stdin_write = if let Some(mut stdin_sink) = stdin_sink {
let (stdin_write, stdin_read) = child_channel::<ChildInput>();
// Spawn a task that will propagate the input stream.
let _join = self.executor.native_spawn(async move {
let _join = self.executor.spawn(async move {
let mut input_stream = stdin_read.map(|child_input| match child_input {
ChildInput::Stdin(bytes) => Ok(bytes),
});
Expand Down Expand Up @@ -243,22 +243,16 @@ impl Nail for RawFdNail {
let nail = self.clone();
let exit_code = self
.executor
.spawn_blocking(
move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
},
|e| {
log::warn!("Server exited uncleanly: {e}");
ExitCode(1)
},
)
.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
})
.boxed();

// Select a single stdout/stderr stream.
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl AsyncSemaphore {
// Spawn a task which will periodically balance Tasks.
let _balancer_task = {
let state = Arc::downgrade(&state);
executor.native_spawn(async move {
executor.spawn(async move {
loop {
sleep(preemptible_duration / 4).await;
if let Some(state) = state.upgrade() {
Expand Down
97 changes: 47 additions & 50 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,59 +734,56 @@ pub async fn prepare_workdir(
let output_dir_paths = req.output_directories.clone();
let maybe_jdk_home = req.jdk_home.clone();
let exclusive_spawn = executor
.spawn_blocking(
move || {
if let Some(jdk_home) = maybe_jdk_home {
symlink(jdk_home, workdir_path2.join(".jdk"))
.map_err(|err| format!("Error making JDK symlink for local execution: {:?}", err))?
}
.spawn_blocking(move || {
if let Some(jdk_home) = maybe_jdk_home {
symlink(jdk_home, workdir_path2.join(".jdk"))
.map_err(|err| format!("Error making JDK symlink for local execution: {:?}", err))?
}

// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes: see the method doc.
let parent_paths_to_create: HashSet<_> = output_file_paths
.iter()
.chain(output_dir_paths.iter())
.map(|relative_path| relative_path.as_ref())
.chain(workdir_symlinks.iter().map(|s| s.src.as_path()))
.filter_map(|rel_path| rel_path.parent())
.map(|parent_relpath| workdir_path2.join(parent_relpath))
.collect();
for path in parent_paths_to_create {
create_dir_all(path.clone()).map_err(|err| {
format!(
"Error making parent directory {:?} for local execution: {:?}",
path, err
)
})?;
}
// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes: see the method doc.
let parent_paths_to_create: HashSet<_> = output_file_paths
.iter()
.chain(output_dir_paths.iter())
.map(|relative_path| relative_path.as_ref())
.chain(workdir_symlinks.iter().map(|s| s.src.as_path()))
.filter_map(|rel_path| rel_path.parent())
.map(|parent_relpath| workdir_path2.join(parent_relpath))
.collect();
for path in parent_paths_to_create {
create_dir_all(path.clone()).map_err(|err| {
format!(
"Error making parent directory {:?} for local execution: {:?}",
path, err
)
})?;
}

for workdir_symlink in workdir_symlinks {
let src = workdir_path2.join(&workdir_symlink.src);
symlink(&workdir_symlink.dst, &src).map_err(|err| {
format!(
"Error linking {} -> {} for local execution: {:?}",
src.display(),
workdir_symlink.dst.display(),
err
)
})?;
}
for workdir_symlink in workdir_symlinks {
let src = workdir_path2.join(&workdir_symlink.src);
symlink(&workdir_symlink.dst, &src).map_err(|err| {
format!(
"Error linking {} -> {} for local execution: {:?}",
src.display(),
workdir_symlink.dst.display(),
err
)
})?;
}

let exe_was_materialized = maybe_executable_path
.as_ref()
.map_or(false, |p| workdir_path2.join(p).exists());
if exe_was_materialized {
debug!(
"Obtaining exclusive spawn lock for process since \
let exe_was_materialized = maybe_executable_path
.as_ref()
.map_or(false, |p| workdir_path2.join(p).exists());
if exe_was_materialized {
debug!(
"Obtaining exclusive spawn lock for process since \
we materialized its executable {:?}.",
maybe_executable_path
);
}
let res: Result<_, String> = Ok(exe_was_materialized);
res
},
|e| Err(format!("Directory materialization task failed: {e}")),
)
maybe_executable_path
);
}
let res: Result<_, String> = Ok(exe_was_materialized);
res
})
.await?;
Ok(exclusive_spawn)
}
Expand Down Expand Up @@ -849,7 +846,7 @@ impl AsyncDropSandbox {
impl Drop for AsyncDropSandbox {
fn drop(&mut self) {
if let Some(sandbox) = self.2.take() {
let _background_cleanup = self.0.native_spawn_blocking(|| std::mem::drop(sandbox));
let _background_cleanup = self.0.spawn_blocking(|| std::mem::drop(sandbox));
}
}
}
Expand Down
13 changes: 5 additions & 8 deletions src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,10 @@ impl NailgunProcess {

// Spawn the process and read its port from stdout.
let (child, port) = executor
.spawn_blocking(
{
let workdir = workdir.path().to_owned();
move || spawn_and_read_port(startup_options, workdir)
},
|e| Err(format!("Nailgun spawn task failed: {e}")),
)
.spawn_blocking({
let workdir = workdir.path().to_owned();
move || spawn_and_read_port(startup_options, workdir)
})
.await?;
debug!(
"Created nailgun server process with pid {} and port {}",
Expand Down Expand Up @@ -543,7 +540,7 @@ async fn clear_workdir(
future::try_join_all(moves).await?;

// And drop it in the background.
let _ = executor.native_spawn_blocking(move || std::mem::drop(garbage_dir));
let _ = executor.spawn_blocking(move || std::mem::drop(garbage_dir));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Drop for RunningOperation {
if let Some(operation_name) = self.name.take() {
debug!("Canceling remote operation {operation_name}");
let mut operations_client = self.operations_client.as_ref().clone();
let _ = self.executor.native_spawn(async move {
let _ = self.executor.spawn(async move {
operations_client
.cancel_operation(CancelOperationRequest {
name: operation_name,
Expand Down
Loading

0 comments on commit e67baf0

Please sign in to comment.