From 1248e2f3f27f8d2d15004482613d2b483ac7cb14 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Wed, 11 Jan 2023 13:07:52 -0800 Subject: [PATCH] Adjust the `task_executor` API to not panic for JoinErrors, which will make race conditions around Scheduler shutdown safe. --- src/rust/engine/fs/src/lib.rs | 10 +- src/rust/engine/fs/store/src/lib.rs | 17 +- src/rust/engine/graph/src/lib.rs | 2 +- src/rust/engine/logging/src/lib.rs | 16 +- src/rust/engine/logging/src/logger.rs | 2 +- src/rust/engine/nailgun/src/server.rs | 32 +- .../engine/process_execution/src/bounded.rs | 2 +- .../engine/process_execution/src/local.rs | 97 ++-- .../src/nailgun/nailgun_pool.rs | 13 +- .../engine/process_execution/src/remote.rs | 2 +- src/rust/engine/sharded_lmdb/src/lib.rs | 488 +++++++++--------- src/rust/engine/src/context.rs | 2 +- src/rust/engine/src/session.rs | 53 +- src/rust/engine/task_executor/src/lib.rs | 56 +- src/rust/engine/ui/src/console_ui.rs | 9 +- src/rust/engine/watch/src/lib.rs | 17 +- 16 files changed, 444 insertions(+), 374 deletions(-) diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index e4976217911b..46bb47181643 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -474,7 +474,15 @@ impl PosixFS { let vfs = self.clone(); self .executor - .spawn_blocking(move || vfs.scandir_sync(&dir_relative_to_root)) + .spawn_blocking( + move || vfs.scandir_sync(&dir_relative_to_root), + |e| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Synchronous scandir failed: {e}"), + )) + }, + ) .await } diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 3461b93f5fef..1682165b479f 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -1312,13 +1312,16 @@ impl Store { store .local .executor() - .spawn_blocking(move || { - if is_root { - fs::safe_create_dir_all(&destination2) - } else { - fs::safe_create_dir(&destination2) - } - }) + .spawn_blocking( + move || { + if is_root { + fs::safe_create_dir_all(&destination2) + } else { + fs::safe_create_dir(&destination2) + } + }, + |e| Err(format!("Directory creation task failed: {e}")), + ) .map_err(|e| { format!( "Failed to create directory {}: {}", diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index 04971a1992f8..52ad1c5f7e4e 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -402,7 +402,7 @@ impl Graph { nodes: HashMap::default(), pg: DiGraph::new(), })); - let _join = executor.spawn(Self::cycle_check_task(Arc::downgrade(&inner))); + let _join = executor.native_spawn(Self::cycle_check_task(Arc::downgrade(&inner))); Graph { inner, diff --git a/src/rust/engine/logging/src/lib.rs b/src/rust/engine/logging/src/lib.rs index e8bfa8004f1c..2c1d924e966b 100644 --- a/src/rust/engine/logging/src/lib.rs +++ b/src/rust/engine/logging/src/lib.rs @@ -26,20 +26,16 @@ #![allow(clippy::mutex_atomic)] /// -/// Macro to allow debug logging to a file which bypasses the standard logging systems. -/// This is useful for one-off debugging, and is code that several developers have found they're -/// writing a lot as one-offs when working in the rust code (particularly when working on logging), -/// so this is a useful macro to exist for one-off use. -/// -/// This should not be used for actual production logging; use the log crate's macros -/// (info!, debug!, trace!) for that. +/// Macro to allow fatal logging to a file which bypasses the standard logging systems. +/// This is useful for code paths which must not interact with stdio or the logging system, and +/// can also be useful for one-off debugging of stdio, logging, or pantsd issues. /// #[macro_export] -macro_rules! debug_log { - ($path:expr, $($arg:tt)+) => { +macro_rules! fatal_log { + ($($arg:tt)+) => { { use ::std::io::Write; - let mut f = ::std::fs::OpenOptions::new().create(true).append(true).open($path).unwrap(); + let mut f = ::std::fs::OpenOptions::new().create(true).append(true).open("fatal.log").unwrap(); writeln!(f, $($arg)+).unwrap() } }; diff --git a/src/rust/engine/logging/src/logger.rs b/src/rust/engine/logging/src/logger.rs index f046f4eaed86..6632e785d162 100644 --- a/src/rust/engine/logging/src/logger.rs +++ b/src/rust/engine/logging/src/logger.rs @@ -234,7 +234,7 @@ impl Log for PantsLogger { Err(e) => { // If we've failed to write to stdio, but also to our log file, our only recourse is to // try to write to a different file. - debug_log!("fatal.log", "Failed to write to log file {:?}: {}", file, e); + fatal_log!("Failed to write to log file {:?}: {}", file, e); } } } diff --git a/src/rust/engine/nailgun/src/server.rs b/src/rust/engine/nailgun/src/server.rs index b01471b73e01..498c3c8151c5 100644 --- a/src/rust/engine/nailgun/src/server.rs +++ b/src/rust/engine/nailgun/src/server.rs @@ -59,7 +59,7 @@ impl Server { let (exited_sender, exited_receiver) = oneshot::channel(); let (exit_sender, exit_receiver) = oneshot::channel(); - let _join = executor.spawn(Self::serve( + let _join = executor.native_spawn(Self::serve( executor.clone(), config, nail, @@ -125,7 +125,7 @@ impl Server { // acquired it. Unfortunately we cannot acquire the lock in this thread and then send the // guard to the other thread due to its lifetime bounds. let connection_started = Arc::new(Notify::new()); - let _join = executor.spawn({ + let _join = executor.native_spawn({ let config = config.clone(); let nail = nail.clone(); let connection_started = connection_started.clone(); @@ -211,7 +211,7 @@ impl Nail for RawFdNail { let maybe_stdin_write = if let Some(mut stdin_sink) = stdin_sink { let (stdin_write, stdin_read) = child_channel::(); // Spawn a task that will propagate the input stream. - let _join = self.executor.spawn(async move { + let _join = self.executor.native_spawn(async move { let mut input_stream = stdin_read.map(|child_input| match child_input { ChildInput::Stdin(bytes) => Ok(bytes), }); @@ -243,16 +243,22 @@ impl Nail for RawFdNail { let nail = self.clone(); let exit_code = self .executor - .spawn_blocking(move || { - // NB: This closure captures the stdio handles, and will drop/close them when it completes. - (nail.runner)(RawFdExecution { - cmd, - cancelled, - stdin_fd: stdin_handle.as_raw_fd(), - stdout_fd: stdout_handle.as_raw_fd(), - stderr_fd: stderr_handle.as_raw_fd(), - }) - }) + .spawn_blocking( + move || { + // NB: This closure captures the stdio handles, and will drop/close them when it completes. + (nail.runner)(RawFdExecution { + cmd, + cancelled, + stdin_fd: stdin_handle.as_raw_fd(), + stdout_fd: stdout_handle.as_raw_fd(), + stderr_fd: stderr_handle.as_raw_fd(), + }) + }, + |e| { + log::warn!("Server exited uncleanly: {e}"); + ExitCode(1) + }, + ) .boxed(); // Select a single stdout/stderr stream. diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs index 849235244895..1ec0420e8c4a 100644 --- a/src/rust/engine/process_execution/src/bounded.rs +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -204,7 +204,7 @@ impl AsyncSemaphore { // Spawn a task which will periodically balance Tasks. let _balancer_task = { let state = Arc::downgrade(&state); - executor.spawn(async move { + executor.native_spawn(async move { loop { sleep(preemptible_duration / 4).await; if let Some(state) = state.upgrade() { diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index b0bb8c6318d0..d1f10061fd9c 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -734,56 +734,59 @@ pub async fn prepare_workdir( let output_dir_paths = req.output_directories.clone(); let maybe_jdk_home = req.jdk_home.clone(); let exclusive_spawn = executor - .spawn_blocking(move || { - if let Some(jdk_home) = maybe_jdk_home { - symlink(jdk_home, workdir_path2.join(".jdk")) - .map_err(|err| format!("Error making JDK symlink for local execution: {:?}", err))? - } + .spawn_blocking( + move || { + if let Some(jdk_home) = maybe_jdk_home { + symlink(jdk_home, workdir_path2.join(".jdk")) + .map_err(|err| format!("Error making JDK symlink for local execution: {:?}", err))? + } - // The bazel remote execution API specifies that the parent directories for output files and - // output directories should be created before execution completes: see the method doc. - let parent_paths_to_create: HashSet<_> = output_file_paths - .iter() - .chain(output_dir_paths.iter()) - .map(|relative_path| relative_path.as_ref()) - .chain(workdir_symlinks.iter().map(|s| s.src.as_path())) - .filter_map(|rel_path| rel_path.parent()) - .map(|parent_relpath| workdir_path2.join(parent_relpath)) - .collect(); - for path in parent_paths_to_create { - create_dir_all(path.clone()).map_err(|err| { - format!( - "Error making parent directory {:?} for local execution: {:?}", - path, err - ) - })?; - } + // The bazel remote execution API specifies that the parent directories for output files and + // output directories should be created before execution completes: see the method doc. + let parent_paths_to_create: HashSet<_> = output_file_paths + .iter() + .chain(output_dir_paths.iter()) + .map(|relative_path| relative_path.as_ref()) + .chain(workdir_symlinks.iter().map(|s| s.src.as_path())) + .filter_map(|rel_path| rel_path.parent()) + .map(|parent_relpath| workdir_path2.join(parent_relpath)) + .collect(); + for path in parent_paths_to_create { + create_dir_all(path.clone()).map_err(|err| { + format!( + "Error making parent directory {:?} for local execution: {:?}", + path, err + ) + })?; + } - for workdir_symlink in workdir_symlinks { - let src = workdir_path2.join(&workdir_symlink.src); - symlink(&workdir_symlink.dst, &src).map_err(|err| { - format!( - "Error linking {} -> {} for local execution: {:?}", - src.display(), - workdir_symlink.dst.display(), - err - ) - })?; - } + for workdir_symlink in workdir_symlinks { + let src = workdir_path2.join(&workdir_symlink.src); + symlink(&workdir_symlink.dst, &src).map_err(|err| { + format!( + "Error linking {} -> {} for local execution: {:?}", + src.display(), + workdir_symlink.dst.display(), + err + ) + })?; + } - let exe_was_materialized = maybe_executable_path - .as_ref() - .map_or(false, |p| workdir_path2.join(p).exists()); - if exe_was_materialized { - debug!( - "Obtaining exclusive spawn lock for process since \ + let exe_was_materialized = maybe_executable_path + .as_ref() + .map_or(false, |p| workdir_path2.join(p).exists()); + if exe_was_materialized { + debug!( + "Obtaining exclusive spawn lock for process since \ we materialized its executable {:?}.", - maybe_executable_path - ); - } - let res: Result<_, String> = Ok(exe_was_materialized); - res - }) + maybe_executable_path + ); + } + let res: Result<_, String> = Ok(exe_was_materialized); + res + }, + |e| Err(format!("Directory materialization task failed: {e}")), + ) .await?; Ok(exclusive_spawn) } @@ -846,7 +849,7 @@ impl AsyncDropSandbox { impl Drop for AsyncDropSandbox { fn drop(&mut self) { if let Some(sandbox) = self.2.take() { - let _background_cleanup = self.0.spawn_blocking(|| std::mem::drop(sandbox)); + let _background_cleanup = self.0.native_spawn_blocking(|| std::mem::drop(sandbox)); } } } diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index af6d72dbcb8e..53653f8fe25c 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -382,10 +382,13 @@ impl NailgunProcess { // Spawn the process and read its port from stdout. let (child, port) = executor - .spawn_blocking({ - let workdir = workdir.path().to_owned(); - move || spawn_and_read_port(startup_options, workdir) - }) + .spawn_blocking( + { + let workdir = workdir.path().to_owned(); + move || spawn_and_read_port(startup_options, workdir) + }, + |e| Err(format!("Nailgun spawn task failed: {e}")), + ) .await?; debug!( "Created nailgun server process with pid {} and port {}", @@ -540,7 +543,7 @@ async fn clear_workdir( future::try_join_all(moves).await?; // And drop it in the background. - let _ = executor.spawn_blocking(move || std::mem::drop(garbage_dir)); + let _ = executor.native_spawn_blocking(move || std::mem::drop(garbage_dir)); Ok(()) } diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index cab764ea55f2..86b9ae7132e4 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -167,7 +167,7 @@ impl Drop for RunningOperation { if let Some(operation_name) = self.name.take() { debug!("Canceling remote operation {operation_name}"); let mut operations_client = self.operations_client.as_ref().clone(); - let _ = self.executor.spawn(async move { + let _ = self.executor.native_spawn(async move { operations_client .cancel_operation(CancelOperationRequest { name: operation_name, diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index b6ebd708204c..81c6d06d7fcf 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -306,24 +306,27 @@ impl ShardedLmdb { let store = self.clone(); self .executor - .spawn_blocking(move || { - let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env, db, _lease_database) = store.get(&fingerprint); - let del_res = env.begin_rw_txn().and_then(|mut txn| { - txn.del(db, &effective_key, None)?; - txn.commit() - }); - - match del_res { - Ok(()) => Ok(true), - Err(lmdb::Error::NotFound) => Ok(false), - Err(err) => Err(format!( - "Error removing versioned key {:?}: {}", - effective_key.to_hex(), - err - )), - } - }) + .spawn_blocking( + move || { + let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env, db, _lease_database) = store.get(&fingerprint); + let del_res = env.begin_rw_txn().and_then(|mut txn| { + txn.del(db, &effective_key, None)?; + txn.commit() + }); + + match del_res { + Ok(()) => Ok(true), + Err(lmdb::Error::NotFound) => Ok(false), + Err(err) => Err(format!( + "Error removing versioned key {:?}: {}", + effective_key.to_hex(), + err + )), + } + }, + |e| Err(format!("`remove` task failed: {e}")), + ) .await } @@ -347,52 +350,56 @@ impl ShardedLmdb { let store = self.clone(); self .executor - .spawn_blocking(move || { - // Group the items by the Environment that they will be applied to. - let mut items_by_env = HashMap::new(); - let mut exists = HashSet::new(); - - for fingerprint in &fingerprints { - let effective_key = VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0); - - let (_, _, batch) = items_by_env - .entry(*env_id) - .or_insert_with(|| (env.clone(), *db, vec![])); - batch.push(effective_key); - } - - // Open and commit a Transaction per Environment. Since we never have more than one - // Transaction open at a time, we don't have to worry about ordering. - for (_, (env, db, batch)) in items_by_env { - env - .begin_ro_txn() - .and_then(|txn| { - for effective_key in &batch { - let get_res = txn.get(db, &effective_key); - match get_res { - Ok(_) => { - exists.insert(effective_key.get_fingerprint()); - } - Err(lmdb::Error::NotFound) => (), - Err(err) => return Err(err), - }; - } - txn.commit() - }) - .map_err(|e| { - format!( - "Error checking existence of fingerprints {:?}: {}", - batch - .iter() - .map(|key| key.get_fingerprint()) - .collect::>(), - e - ) - })?; - } - Ok(exists) - }) + .spawn_blocking( + move || { + // Group the items by the Environment that they will be applied to. + let mut items_by_env = HashMap::new(); + let mut exists = HashSet::new(); + + for fingerprint in &fingerprints { + let effective_key = + VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0); + + let (_, _, batch) = items_by_env + .entry(*env_id) + .or_insert_with(|| (env.clone(), *db, vec![])); + batch.push(effective_key); + } + + // Open and commit a Transaction per Environment. Since we never have more than one + // Transaction open at a time, we don't have to worry about ordering. + for (_, (env, db, batch)) in items_by_env { + env + .begin_ro_txn() + .and_then(|txn| { + for effective_key in &batch { + let get_res = txn.get(db, &effective_key); + match get_res { + Ok(_) => { + exists.insert(effective_key.get_fingerprint()); + } + Err(lmdb::Error::NotFound) => (), + Err(err) => return Err(err), + }; + } + txn.commit() + }) + .map_err(|e| { + format!( + "Error checking existence of fingerprints {:?}: {}", + batch + .iter() + .map(|key| key.get_fingerprint()) + .collect::>(), + e + ) + })?; + } + Ok(exists) + }, + |e| Err(format!("`exists_batch` task failed: {e}")), + ) .await } @@ -426,60 +433,63 @@ impl ShardedLmdb { let store = self.clone(); self .executor - .spawn_blocking(move || { - // Group the items by the Environment that they will be applied to. - let mut items_by_env = HashMap::new(); - let mut fingerprints = Vec::new(); - for (maybe_fingerprint, bytes) in items { - let fingerprint = maybe_fingerprint.unwrap_or_else(|| Digest::of_bytes(&bytes).hash); - let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env_id, _, env, db, lease_database) = store.get_raw(&fingerprint.0); - - let (_, _, _, batch) = items_by_env - .entry(*env_id) - .or_insert_with(|| (env.clone(), *db, *lease_database, vec![])); - batch.push((effective_key, bytes)); - fingerprints.push(fingerprint); - } - - // Open and commit a Transaction per Environment. Since we never have more than one - // Transaction open at a time, we don't have to worry about ordering. - for (_, (env, db, lease_database, batch)) in items_by_env { - env - .begin_rw_txn() - .and_then(|mut txn| { - for (effective_key, bytes) in &batch { - let put_res = txn.put(db, &effective_key, &bytes, WriteFlags::NO_OVERWRITE); - match put_res { - Ok(()) => (), - Err(lmdb::Error::KeyExist) => continue, - Err(err) => return Err(err), - } - if initial_lease { - store.lease_inner( - lease_database, - effective_key, - store.lease_until_secs_since_epoch(), - &mut txn, - )?; + .spawn_blocking( + move || { + // Group the items by the Environment that they will be applied to. + let mut items_by_env = HashMap::new(); + let mut fingerprints = Vec::new(); + for (maybe_fingerprint, bytes) in items { + let fingerprint = maybe_fingerprint.unwrap_or_else(|| Digest::of_bytes(&bytes).hash); + let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env_id, _, env, db, lease_database) = store.get_raw(&fingerprint.0); + + let (_, _, _, batch) = items_by_env + .entry(*env_id) + .or_insert_with(|| (env.clone(), *db, *lease_database, vec![])); + batch.push((effective_key, bytes)); + fingerprints.push(fingerprint); + } + + // Open and commit a Transaction per Environment. Since we never have more than one + // Transaction open at a time, we don't have to worry about ordering. + for (_, (env, db, lease_database, batch)) in items_by_env { + env + .begin_rw_txn() + .and_then(|mut txn| { + for (effective_key, bytes) in &batch { + let put_res = txn.put(db, &effective_key, &bytes, WriteFlags::NO_OVERWRITE); + match put_res { + Ok(()) => (), + Err(lmdb::Error::KeyExist) => continue, + Err(err) => return Err(err), + } + if initial_lease { + store.lease_inner( + lease_database, + effective_key, + store.lease_until_secs_since_epoch(), + &mut txn, + )?; + } } - } - txn.commit() - }) - .map_err(|e| { - format!( - "Error storing fingerprints {:?}: {}", - batch - .iter() - .map(|(key, _)| key.to_hex()) - .collect::>(), - e - ) - })?; - } - - Ok(fingerprints) - }) + txn.commit() + }) + .map_err(|e| { + format!( + "Error storing fingerprints {:?}: {}", + batch + .iter() + .map(|(key, _)| key.to_hex()) + .collect::>(), + e + ) + })?; + } + + Ok(fingerprints) + }, + |e| Err(format!("`store_bytes_batch` task failed: {e}")), + ) .await } @@ -504,116 +514,127 @@ impl ShardedLmdb { let store = self.clone(); self .executor - .spawn_blocking(move || { - let mut attempts = 0; - loop { - // First pass: compute the Digest. - let digest = { - let mut read = data_provider().map_err(|e| format!("Failed to read: {}", e))?; - let mut hasher = WriterHasher::new(io::sink()); - let _ = io::copy(&mut read, &mut hasher) - .map_err(|e| format!("Failed to read from {:?}: {}", read, e))?; - hasher.finish().0 - }; - - let effective_key = VersionedFingerprint::new(digest.hash, ShardedLmdb::SCHEMA_VERSION); - let (env, db, lease_database) = store.get(&digest.hash); - let put_res: Result<(), StoreError> = env - .begin_rw_txn() - .map_err(StoreError::Lmdb) - .and_then(|mut txn| { - // Second pass: copy into the reserved memory. - let mut writer = txn - .reserve( - db, - &effective_key, - digest.size_bytes, - WriteFlags::NO_OVERWRITE, - )? - .writer(); + .spawn_blocking( + move || { + let mut attempts = 0; + loop { + // First pass: compute the Digest. + let digest = { let mut read = data_provider().map_err(|e| format!("Failed to read: {}", e))?; - let should_retry = if data_is_immutable { - // Trust that the data hasn't changed, and only validate its length. - let copied = io::copy(&mut read, &mut writer).map_err(|e| { - format!( - "Failed to copy from {:?} or store in {:?}: {:?}", - read, env, e - ) - })?; - - // Should retry if the file got shorter between reads. - copied as usize != digest.size_bytes - } else { - // Confirm that the data hasn't changed. - let mut hasher = WriterHasher::new(writer); - let _ = io::copy(&mut read, &mut hasher).map_err(|e| { - format!( - "Failed to copy from {:?} or store in {:?}: {:?}", - read, env, e - ) - })?; - - // Should retry if the Digest changed between reads. - digest != hasher.finish().0 - }; - - if should_retry { - let msg = format!("Input {:?} changed while reading.", read); - log::debug!("{}", msg); - return Err(StoreError::Retry(msg)); - } + let mut hasher = WriterHasher::new(io::sink()); + let _ = io::copy(&mut read, &mut hasher) + .map_err(|e| format!("Failed to read from {:?}: {}", read, e))?; + hasher.finish().0 + }; + + let effective_key = VersionedFingerprint::new(digest.hash, ShardedLmdb::SCHEMA_VERSION); + let (env, db, lease_database) = store.get(&digest.hash); + let put_res: Result<(), StoreError> = env + .begin_rw_txn() + .map_err(StoreError::Lmdb) + .and_then(|mut txn| { + // Second pass: copy into the reserved memory. + let mut writer = txn + .reserve( + db, + &effective_key, + digest.size_bytes, + WriteFlags::NO_OVERWRITE, + )? + .writer(); + let mut read = data_provider().map_err(|e| format!("Failed to read: {}", e))?; + let should_retry = if data_is_immutable { + // Trust that the data hasn't changed, and only validate its length. + let copied = io::copy(&mut read, &mut writer).map_err(|e| { + format!( + "Failed to copy from {:?} or store in {:?}: {:?}", + read, env, e + ) + })?; + + // Should retry if the file got shorter between reads. + copied as usize != digest.size_bytes + } else { + // Confirm that the data hasn't changed. + let mut hasher = WriterHasher::new(writer); + let _ = io::copy(&mut read, &mut hasher).map_err(|e| { + format!( + "Failed to copy from {:?} or store in {:?}: {:?}", + read, env, e + ) + })?; + + // Should retry if the Digest changed between reads. + digest != hasher.finish().0 + }; - if initial_lease { - store.lease_inner( - lease_database, - &effective_key, - store.lease_until_secs_since_epoch(), - &mut txn, - )?; + if should_retry { + let msg = format!("Input {:?} changed while reading.", read); + log::debug!("{}", msg); + return Err(StoreError::Retry(msg)); + } + + if initial_lease { + store.lease_inner( + lease_database, + &effective_key, + store.lease_until_secs_since_epoch(), + &mut txn, + )?; + } + txn.commit()?; + Ok(()) + }); + + match put_res { + Ok(()) => return Ok(digest), + Err(StoreError::Retry(msg)) => { + // Input changed during reading: maybe retry. + if attempts > 10 { + return Err(msg); + } else { + attempts += 1; + continue; + } + } + Err(StoreError::Lmdb(lmdb::Error::KeyExist)) => return Ok(digest), + Err(StoreError::Lmdb(err)) => { + return Err(format!("Error storing {:?}: {}", digest, err)) } - txn.commit()?; - Ok(()) - }); - - match put_res { - Ok(()) => return Ok(digest), - Err(StoreError::Retry(msg)) => { - // Input changed during reading: maybe retry. - if attempts > 10 { - return Err(msg); - } else { - attempts += 1; - continue; + Err(StoreError::Io(err)) => { + return Err(format!("Error storing {:?}: {}", digest, err)) } - } - Err(StoreError::Lmdb(lmdb::Error::KeyExist)) => return Ok(digest), - Err(StoreError::Lmdb(err)) => { - return Err(format!("Error storing {:?}: {}", digest, err)) - } - Err(StoreError::Io(err)) => return Err(format!("Error storing {:?}: {}", digest, err)), - }; - } - }) + }; + } + }, + |e| Err(format!("`store` task failed: {e}")), + ) .await } - pub async fn lease(&self, fingerprint: Fingerprint) -> Result<(), lmdb::Error> { + pub async fn lease(&self, fingerprint: Fingerprint) -> Result<(), String> { let store = self.clone(); self .executor - .spawn_blocking(move || { - let until_secs_since_epoch: u64 = store.lease_until_secs_since_epoch(); - let (env, _, lease_database) = store.get(&fingerprint); - env.begin_rw_txn().and_then(|mut txn| { - store.lease_inner( - lease_database, - &VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION), - until_secs_since_epoch, - &mut txn, - )?; - txn.commit() - }) - }) + .spawn_blocking( + move || { + let until_secs_since_epoch: u64 = store.lease_until_secs_since_epoch(); + let (env, _, lease_database) = store.get(&fingerprint); + env + .begin_rw_txn() + .and_then(|mut txn| { + store.lease_inner( + lease_database, + &VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION), + until_secs_since_epoch, + &mut txn, + )?; + txn.commit() + }) + .map_err(|e| format!("Error leasing {fingerprint:?}: {e}")) + }, + |e| Err(format!("`lease` task failed: {e}")), + ) .await } @@ -651,21 +672,24 @@ impl ShardedLmdb { let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); self .executor - .spawn_blocking(move || { - let (env, db, _) = store.get(&fingerprint); - let ro_txn = env - .begin_ro_txn() - .map_err(|err| format!("Failed to begin read transaction: {}", err))?; - match ro_txn.get(db, &effective_key) { - Ok(bytes) => f(bytes).map(Some), - Err(lmdb::Error::NotFound) => Ok(None), - Err(err) => Err(format!( - "Error loading versioned key {:?}: {}", - effective_key.to_hex(), - err, - )), - } - }) + .spawn_blocking( + move || { + let (env, db, _) = store.get(&fingerprint); + let ro_txn = env + .begin_ro_txn() + .map_err(|err| format!("Failed to begin read transaction: {}", err))?; + match ro_txn.get(db, &effective_key) { + Ok(bytes) => f(bytes).map(Some), + Err(lmdb::Error::NotFound) => Ok(None), + Err(err) => Err(format!( + "Error loading versioned key {:?}: {}", + effective_key.to_hex(), + err, + )), + } + }, + |e| Err(format!("`load_bytes_with` task failed: {e}")), + ) .await } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 4dde9252e227..828b7f19b27d 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -882,6 +882,6 @@ impl NodeContext for Context { where F: Future + Send + 'static, { - let _join = self.core.executor.spawn(future); + let _join = self.core.executor.native_spawn(future); } } diff --git a/src/rust/engine/src/session.rs b/src/rust/engine/src/session.rs index 984ae732aec9..92f12bd579d9 100644 --- a/src/rust/engine/src/session.rs +++ b/src/rust/engine/src/session.rs @@ -12,13 +12,14 @@ use crate::nodes::{NodeKey, Select}; use crate::python::{Failure, Value}; use async_latch::AsyncLatch; -use futures::future::{self, AbortHandle, Abortable, FutureExt}; +use futures::future::{self, FutureExt}; use graph::LastObserved; use log::warn; use parking_lot::Mutex; use pyo3::prelude::*; use task_executor::{Executor, TailTasks}; use tokio::signal::unix::{signal, SignalKind}; +use tokio::task::JoinHandle; use ui::ConsoleUI; use workunit_store::{format_workunit_duration_ms, RunId, WorkunitStore}; @@ -392,7 +393,7 @@ pub struct Sessions { /// Core/Scheduler are being shut down. sessions: Arc>>>>, /// Handle to kill the signal monitoring task when this object is killed. - signal_task_abort_handle: AbortHandle, + signal_task_handle: JoinHandle<()>, /// A generator for RunId values. Although this is monotonic, there is no meaning assigned to /// ordering: only equality is relevant. run_id_generator: AtomicU32, @@ -404,40 +405,34 @@ impl Sessions { Arc::new(Mutex::new(Some(Vec::new()))); // A task that watches for keyboard interrupts arriving at this process, and cancels all // non-isolated Sessions. - let signal_task_abort_handle = { + let signal_task_handle = { let mut signal_stream = signal(SignalKind::interrupt()) .map_err(|err| format!("Failed to install interrupt handler: {}", err))?; - let (abort_handle, abort_registration) = AbortHandle::new_pair(); let sessions = sessions.clone(); - #[allow(clippy::let_underscore_lock)] - let _ = executor.spawn(Abortable::new( - async move { - loop { - let _ = signal_stream.recv().await; - let cancellable_sessions = { - let sessions = sessions.lock(); - if let Some(ref sessions) = *sessions { - sessions - .iter() - .flat_map(|session| session.upgrade()) - .filter(|session| !session.isolated) - .collect::>() - } else { - vec![] - } - }; - for session in cancellable_sessions { - session.cancel(); + executor.native_spawn(async move { + loop { + let _ = signal_stream.recv().await; + let cancellable_sessions = { + let sessions = sessions.lock(); + if let Some(ref sessions) = *sessions { + sessions + .iter() + .flat_map(|session| session.upgrade()) + .filter(|session| !session.isolated) + .collect::>() + } else { + vec![] } + }; + for session in cancellable_sessions { + session.cancel(); } - }, - abort_registration, - )); - abort_handle + } + }) }; Ok(Sessions { sessions, - signal_task_abort_handle, + signal_task_handle, run_id_generator: AtomicU32::new(0), }) } @@ -494,6 +489,6 @@ impl Sessions { impl Drop for Sessions { fn drop(&mut self) { - self.signal_task_abort_handle.abort(); + self.signal_task_handle.abort(); } } diff --git a/src/rust/engine/task_executor/src/lib.rs b/src/rust/engine/task_executor/src/lib.rs index 4425372a34ec..efa8097d00f5 100644 --- a/src/rust/engine/task_executor/src/lib.rs +++ b/src/rust/engine/task_executor/src/lib.rs @@ -37,7 +37,7 @@ use itertools::Itertools; use lazy_static::lazy_static; use parking_lot::Mutex; use tokio::runtime::{Builder, Handle, Runtime}; -use tokio::task::{Id, JoinSet}; +use tokio::task::{Id, JoinError, JoinHandle, JoinSet}; lazy_static! { // Lazily initialized in Executor::global. @@ -143,7 +143,8 @@ impl Executor { /// /// Run a Future on a tokio Runtime as a new Task, and return a Future handle to it. /// - /// Unlike tokio::spawn, if the background Task panics, the returned Future will too. + /// If the background Task exits abnormally, the given closure will be called to recover: usually + /// it should convert the resulting Error to a relevant error type. /// /// If the returned Future is dropped, the computation will still continue to completion: see /// @@ -151,11 +152,22 @@ impl Executor { pub fn spawn + Send + 'static>( &self, future: F, + rescue_join_error: impl FnOnce(JoinError) -> O, ) -> impl Future { - self - .handle - .spawn(future_with_correct_context(future)) - .map(|r| r.expect("Background task exited unsafely.")) + self.native_spawn(future).map(|res| match res { + Ok(o) => o, + Err(e) => rescue_join_error(e), + }) + } + + /// + /// Run a Future on a tokio Runtime as a new Task, and return a JoinHandle. + /// + pub fn native_spawn + Send + 'static>( + &self, + future: F, + ) -> JoinHandle { + self.handle.spawn(future_with_correct_context(future)) } /// @@ -178,8 +190,8 @@ impl Executor { /// Spawn a Future on a threadpool specifically reserved for I/O tasks which are allowed to be /// long-running. /// - /// Unlike tokio::task::spawn_blocking, If the background Task panics, the returned Future will - /// too. + /// If the background Task exits abnormally, the given closure will be called to recover: usually + /// it should convert the resulting Error to a relevant error type. /// /// If the returned Future is dropped, the computation will still continue to completion: see /// @@ -187,19 +199,31 @@ impl Executor { pub fn spawn_blocking R + Send + 'static, R: Send + 'static>( &self, f: F, + rescue_join_error: impl FnOnce(JoinError) -> R, ) -> impl Future { + self.native_spawn_blocking(f).map(|res| match res { + Ok(o) => o, + Err(e) => rescue_join_error(e), + }) + } + + /// + /// Spawn a Future on threads specifically reserved for I/O tasks which are allowed to be + /// long-running and return a JoinHandle + /// + pub fn native_spawn_blocking R + Send + 'static, R: Send + 'static>( + &self, + f: F, + ) -> JoinHandle { let stdio_destination = stdio::get_destination(); let workunit_store_handle = workunit_store::get_workunit_store_handle(); // NB: We unwrap here because the only thing that should cause an error in a spawned task is a // panic, in which case we want to propagate that. - self - .handle - .spawn_blocking(move || { - stdio::set_thread_destination(stdio_destination); - workunit_store::set_thread_workunit_store_handle(workunit_store_handle); - f() - }) - .map(|r| r.expect("Background task exited unsafely.")) + self.handle.spawn_blocking(move || { + stdio::set_thread_destination(stdio_destination); + workunit_store::set_thread_workunit_store_handle(workunit_store_handle); + f() + }) } /// Return a reference to this executor's runtime handle. diff --git a/src/rust/engine/ui/src/console_ui.rs b/src/rust/engine/ui/src/console_ui.rs index d8f488fb2050..ea21afd60617 100644 --- a/src/rust/engine/ui/src/console_ui.rs +++ b/src/rust/engine/ui/src/console_ui.rs @@ -42,6 +42,8 @@ use terminal_size::terminal_size_using_fd; use prodash::progress::Step; use prodash::render::line; use prodash::{Root, TreeOptions}; + +use logging::fatal_log; use task_executor::Executor; use workunit_store::{format_workunit_duration_ms, SpanId, WorkunitStore}; @@ -226,7 +228,7 @@ impl Instance { // TODO: There is a shutdown race here, where if the UI is torn down before exclusive access is // dropped, we might drop stderr on the floor. That likely causes: // https://github.com/pantsbuild/pants/issues/13276 - let _stderr_task = executor.spawn_blocking({ + let _stderr_task = executor.native_spawn_blocking({ let mut tree = tree.clone(); move || { while let Ok(stderr) = stderr_receiver.recv() { @@ -406,7 +408,10 @@ impl Instance { prodash .executor .clone() - .spawn_blocking(move || prodash.handle.shutdown_and_wait()) + .spawn_blocking( + move || prodash.handle.shutdown_and_wait(), + |e| fatal_log!("Failed to teardown UI: {e}"), + ) .boxed() } } diff --git a/src/rust/engine/watch/src/lib.rs b/src/rust/engine/watch/src/lib.rs index 1d30990aa297..b802b5432494 100644 --- a/src/rust/engine/watch/src/lib.rs +++ b/src/rust/engine/watch/src/lib.rs @@ -311,13 +311,16 @@ impl InvalidationWatcher { let watcher = self.clone(); executor - .spawn_blocking(move || { - let mut inner = watcher.0.lock(); - inner - .watcher - .watch(&path, RecursiveMode::NonRecursive) - .map_err(|e| maybe_enrich_notify_error(&path, e)) - }) + .spawn_blocking( + move || { + let mut inner = watcher.0.lock(); + inner + .watcher + .watch(&path, RecursiveMode::NonRecursive) + .map_err(|e| maybe_enrich_notify_error(&path, e)) + }, + |e| Err(format!("Watch attempt failed: {e}")), + ) .await } }