diff --git a/Cargo.lock b/Cargo.lock index c52c3b20e1..14c9e776db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1475,6 +1475,7 @@ dependencies = [ "home", "lazy_static", "libc", + "num_cpus", "opener", "openssl", "pgp", diff --git a/Cargo.toml b/Cargo.toml index d1c0cae22e..86b11f531e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ git-testament = "0.1.4" home = "0.5" lazy_static = "1" libc = "0.2" +num_cpus = "1.13" opener = "0.4.0" # Used by `curl` or `reqwest` backend although it isn't imported # by our rustup. diff --git a/README.md b/README.md index c4864e462b..92ae3dbebb 100644 --- a/README.md +++ b/README.md @@ -697,7 +697,7 @@ currently can define only `default_toolchain`. Sets the root URL for downloading self-updates. - `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the - number of threads to perform close IO in. Set to `disabled` to force + number of threads to perform close IO in. Set to `1` to force single-threaded IO for troubleshooting, or an arbitrary number to override automatic detection. diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index a655479397..51932d1660 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -59,6 +59,7 @@ use std::io::{self, Write}; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; +use crate::errors::{Result, ResultExt}; use crate::process; use crate::utils::notifications::Notification; @@ -194,20 +195,16 @@ pub fn create_dir>(path: P) -> io::Result<()> { /// Get the executor for disk IO. pub fn get_executor<'a>( notify_handler: Option<&'a dyn Fn(Notification<'_>)>, -) -> Box { +) -> Result> { // If this gets lots of use, consider exposing via the config file. - if let Ok(thread_str) = process().var("RUSTUP_IO_THREADS") { - if thread_str == "disabled" { - Box::new(immediate::ImmediateUnpacker::new()) - } else if let Ok(thread_count) = thread_str.parse::() { - Box::new(threaded::Threaded::new_with_threads( - notify_handler, - thread_count, - )) - } else { - Box::new(threaded::Threaded::new(notify_handler)) - } - } else { - Box::new(threaded::Threaded::new(notify_handler)) - } + let thread_count = match process().var("RUSTUP_IO_THREADS") { + Err(_) => num_cpus::get(), + Ok(n) => n + .parse::() + .chain_err(|| "invalid value in RUSTUP_IO_THREADS. Must be a natural number")?, + }; + Ok(match thread_count { + 0 | 1 => Box::new(immediate::ImmediateUnpacker::new()), + n => Box::new(threaded::Threaded::new(notify_handler, n)), + }) } diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index b71b2ee78b..07a01d03f4 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -34,29 +34,7 @@ pub struct Threaded<'a> { } impl<'a> Threaded<'a> { - pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Self { - // Defaults to hardware thread count threads; this is suitable for - // our needs as IO bound operations tend to show up as write latencies - // rather than close latencies, so we don't need to look at - // more threads to get more IO dispatched at this stage in the process. - let pool = threadpool::Builder::new() - .thread_name("CloseHandle".into()) - .thread_stack_size(1_048_576) - .build(); - let (tx, rx) = channel(); - Self { - n_files: Arc::new(AtomicUsize::new(0)), - pool, - notify_handler, - rx, - tx, - } - } - - pub fn new_with_threads( - notify_handler: Option<&'a dyn Fn(Notification<'_>)>, - thread_count: usize, - ) -> Self { + pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>, thread_count: usize) -> Self { // Defaults to hardware thread count threads; this is suitable for // our needs as IO bound operations tend to show up as write latencies // rather than close latencies, so we don't need to look at diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 02469a9989..3beddf54fe 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -293,7 +293,7 @@ fn unpack_without_first_dir<'a, R: Read>( path: &Path, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ) -> Result<()> { - let mut io_executor: Box = get_executor(notify_handler); + let mut io_executor: Box = get_executor(notify_handler)?; let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?;