diff --git a/src/cli/self_update.rs b/src/cli/self_update.rs index fdd0e7cee0..46015f28b5 100644 --- a/src/cli/self_update.rs +++ b/src/cli/self_update.rs @@ -413,6 +413,7 @@ pub fn install( fn rustc_or_cargo_exists_in_path() -> Result<()> { // Ignore rustc and cargo if present in $HOME/.cargo/bin or a few other directories + #[allow(clippy::ptr_arg)] fn ignore_paths(path: &PathBuf) -> bool { !path .components() diff --git a/src/cli/self_update/windows.rs b/src/cli/self_update/windows.rs index 28445c3f31..85682e666f 100644 --- a/src/cli/self_update/windows.rs +++ b/src/cli/self_update/windows.rs @@ -180,6 +180,7 @@ fn _apply_new_path(new_path: Option>) -> Result<()> { } // Tell other processes to update their environment + #[allow(clippy::unnecessary_cast)] unsafe { SendMessageTimeoutA( HWND_BROADCAST, diff --git a/src/cli/topical_doc.rs b/src/cli/topical_doc.rs index f9b636a666..c1c04f38c7 100644 --- a/src/cli/topical_doc.rs +++ b/src/cli/topical_doc.rs @@ -22,7 +22,7 @@ fn index_html(doc: &DocData<'_>, wpath: &Path) -> Option { } } -fn dir_into_vec(dir: &PathBuf) -> Result> { +fn dir_into_vec(dir: &Path) -> Result> { let entries = fs::read_dir(dir).chain_err(|| format!("Opening directory {:?}", dir))?; let mut v = Vec::new(); for entry in entries { diff --git a/src/config.rs b/src/config.rs index a873482249..e589ae0425 100644 --- a/src/config.rs +++ b/src/config.rs @@ -110,12 +110,12 @@ impl<'a> OverrideCfg<'a> { || file.toolchain.components.is_some() || file.toolchain.profile.is_some() { - return Err(ErrorKind::CannotSpecifyPathAndOptions(path.into()).into()); + return Err(ErrorKind::CannotSpecifyPathAndOptions(path).into()); } Some(Toolchain::from_path(cfg, cfg_path, &path)?) } (Some(channel), Some(path)) => { - return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path.into()).into()) + return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path).into()) } (None, None) => None, }, @@ -172,8 +172,7 @@ impl PgpPublicKey { Ok(ret) } use pgp::types::KeyTrait; - let mut ret = Vec::new(); - ret.push(format!("from {}", self)); + let mut ret = vec![format!("from {}", self)]; let key = self.key(); let keyid = format_hex(&key.key_id().to_vec(), "-", 4)?; let algo = key.algorithm(); diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index 8c154fc3ff..a778cfd20c 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -2,27 +2,204 @@ /// /// Use for diagnosing bugs or working around any unexpected issues with the /// threaded code paths. -use super::{perform, Executor, Item}; +use std::{ + fmt::Debug, + fs::{File, OpenOptions}, + io::{self, Write}, + path::Path, + sync::{Arc, Mutex}, + time::Instant, +}; + +use super::{CompletedIo, Executor, Item}; + +#[derive(Debug)] +pub struct _IncrementalFileState { + completed_chunks: Vec, + err: Option>, + item: Option, + finished: bool, +} + +pub(super) type IncrementalFileState = Arc>>; + +#[derive(Default, Debug)] +pub struct ImmediateUnpacker { + incremental_state: IncrementalFileState, +} -#[derive(Default)] -pub struct ImmediateUnpacker {} impl ImmediateUnpacker { pub fn new() -> Self { - Self {} + Self { + ..Default::default() + } + } + + fn deque(&self) -> Box> { + let mut guard = self.incremental_state.lock().unwrap(); + // incremental file in progress + if let Some(ref mut state) = *guard { + // Case 1: pending errors + if state.finished { + let mut item = state.item.take().unwrap(); + if state.err.is_some() { + let err = state.err.take().unwrap(); + item.result = err; + } + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + if state.finished { + *guard = None; + } + Box::new(Some(CompletedIo::Item(item)).into_iter()) + } else { + // Case 2: pending chunks (which might be empty) + let mut completed_chunks = vec![]; + completed_chunks.append(&mut state.completed_chunks); + Box::new(completed_chunks.into_iter().map(CompletedIo::Chunk)) + } + } else { + Box::new(None.into_iter()) + } } } impl Executor for ImmediateUnpacker { - fn dispatch(&self, mut item: Item) -> Box + '_> { - perform(&mut item); - Box::new(Some(item).into_iter()) + fn dispatch(&self, mut item: Item) -> Box + '_> { + item.result = match &mut item.kind { + super::Kind::Directory => super::create_dir(&item.full_path), + super::Kind::File(ref contents) => { + super::write_file(&item.full_path, &contents, item.mode) + } + super::Kind::IncrementalFile(_incremental_file) => { + return { + // If there is a pending error, return it, otherwise stash the + // Item for eventual return when the file is finished. + let mut guard = self.incremental_state.lock().unwrap(); + if let Some(ref mut state) = *guard { + if state.err.is_some() { + let err = state.err.take().unwrap(); + item.result = err; + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + *guard = None; + Box::new(Some(CompletedIo::Item(item)).into_iter()) + } else { + state.item = Some(item); + Box::new(None.into_iter()) + } + } else { + unreachable!(); + } + }; + } + }; + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + Box::new(Some(CompletedIo::Item(item)).into_iter()) + } + + fn join(&mut self) -> Box> { + self.deque() } - fn join(&mut self) -> Box> { - Box::new(None.into_iter()) + fn completed(&self) -> Box> { + self.deque() } - fn completed(&self) -> Box> { - Box::new(None.into_iter()) + fn incremental_file_state(&self) -> super::IncrementalFileState { + let mut state = self.incremental_state.lock().unwrap(); + if state.is_some() { + unreachable!(); + } else { + *state = Some(_IncrementalFileState { + completed_chunks: vec![], + err: None, + item: None, + finished: false, + }); + super::IncrementalFileState::Immediate(self.incremental_state.clone()) + } + } +} + +/// The non-shared state for writing a file incrementally +#[derive(Debug)] +pub(super) struct IncrementalFileWriter { + state: IncrementalFileState, + file: Option, + path_display: String, +} + +impl IncrementalFileWriter { + #[allow(unused_variables)] + pub fn new>( + path: P, + mode: u32, + state: IncrementalFileState, + ) -> std::result::Result { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + let file = Some({ + trace_scoped!("creat", "name": path_display); + opts.write(true).create(true).truncate(true).open(path)? + }); + Ok(IncrementalFileWriter { + file, + state, + path_display, + }) + } + + pub fn chunk_submit(&mut self, chunk: Vec) -> bool { + if (self.state.lock().unwrap()).is_none() { + return false; + } + match self.write(chunk) { + Ok(v) => v, + Err(e) => { + let mut state = self.state.lock().unwrap(); + if let Some(ref mut state) = *state { + state.err.replace(Err(e)); + state.finished = true; + false + } else { + false + } + } + } + } + + fn write(&mut self, chunk: Vec) -> std::result::Result { + let mut state = self.state.lock().unwrap(); + if let Some(ref mut state) = *state { + if let Some(ref mut file) = (&mut self.file).as_mut() { + // Length 0 vector is used for clean EOF signalling. + if chunk.is_empty() { + trace_scoped!("close", "name:": self.path_display); + drop(std::mem::take(&mut self.file)); + state.finished = true; + } else { + trace_scoped!("write_segment", "name": self.path_display, "len": chunk.len()); + file.write_all(&chunk)?; + + state.completed_chunks.push(chunk.len()); + } + Ok(true) + } else { + Ok(false) + } + } else { + unreachable!(); + } } } diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index cc2ae9e65e..09ccdc1a66 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -52,23 +52,74 @@ // f) data gathering: record (name, bytes, start, duration) // write to disk afterwards as a csv file? pub mod immediate; +#[cfg(test)] +mod test; pub mod threaded; -use std::fs::OpenOptions; use std::io::{self, Write}; use std::path::{Path, PathBuf}; +use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; +use std::{fmt::Debug, fs::OpenOptions}; use crate::errors::{Result, ResultExt}; use crate::process; use crate::utils::notifications::Notification; +/// Carries the implementation specific channel data into the executor. +#[derive(Debug)] +pub enum IncrementalFile { + ImmediateReceiver, + ThreadedReceiver(Receiver>), +} + +// The basic idea is that in single threaded mode we get this pattern: +// package budget io-layer +// +<-claim-> +// +-submit--------+ | write +// +-complete------+ +// + +// .. loop .. +// In thread mode with lots of memory we want the following: +// +<-claim-> +// +-submit--------+ +// +<-claim-> +// +-submit--------+ +// .. loop .. | writes +// +-complete------+ +// + +// +-complete------+ +// + +// In thread mode with limited memory we want the following: +// +<-claim-> +// +-submit--------+ +// +<-claim-> +// +-submit--------+ +// .. loop up to budget .. | writes +// +-complete------+ +// + +// +<-claim-> +// +-submit--------+ +// .. loop etc .. +// +// lastly we want pending IOs such as directory creation to be able to complete in the same way, so a chunk completion +// needs to be able to report back in the same fashion; folding it into the same enum will make the driver code easier to write. +// +// The implementation is done via a pair of MPSC channels. One to send data to write. In +// the immediate model, acknowledgements are sent after doing the write immediately. In the threaded model, +// acknowledgements are sent after the write completes in the thread pool handler. In the packages code the inner that +// handles iops and continues processing incremental mode files handles the connection between the acks and the budget. +// Error reporting is passed through the regular completion port, to avoid creating a new special case. + +/// What kind of IO operation to perform #[derive(Debug)] pub enum Kind { Directory, File(Vec), + IncrementalFile(IncrementalFile), } +/// The details of the IO operation #[derive(Debug)] pub struct Item { /// The path to operate on @@ -81,12 +132,20 @@ pub struct Item { pub finish: Option, /// The length of the file, for files (for stats) pub size: Option, - /// The result of the operation + /// The result of the operation (could now be factored into CompletedIO...) pub result: io::Result<()>, /// The mode to apply pub mode: u32, } +#[derive(Debug)] +pub enum CompletedIo { + /// A submitted Item has completed + Item(Item), + /// An IncrementalFile has completed a single chunk + Chunk(usize), +} + impl Item { pub fn make_dir(full_path: PathBuf, mode: u32) -> Self { Self { @@ -112,6 +171,61 @@ impl Item { mode, } } + + pub fn write_file_segmented<'a>( + full_path: PathBuf, + mode: u32, + state: IncrementalFileState, + ) -> Result<(Self, Box) -> bool + 'a>)> { + let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?; + let result = Self { + full_path, + kind: Kind::IncrementalFile(content_callback), + start: None, + finish: None, + size: None, + result: Ok(()), + mode, + }; + Ok((result, Box::new(chunk_submit))) + } +} + +// This could be a boxed trait object perhaps... but since we're looking at +// rewriting this all into an aio layer anyway, and not looking at plugging +// different backends in at this time, it can keep. +/// Implementation specific state for incremental file writes. This effectively +/// just allows the immediate codepath to get access to the Arc referenced state +/// without holding a lifetime reference to the executor, as the threaded code +/// path is all message passing. +pub enum IncrementalFileState { + Threaded, + Immediate(immediate::IncrementalFileState), +} + +impl IncrementalFileState { + /// Get a channel for submitting incremental file chunks to the executor + fn incremental_file_channel( + &self, + path: &Path, + mode: u32, + ) -> Result<(Box) -> bool>, IncrementalFile)> { + use std::sync::mpsc::channel; + match *self { + IncrementalFileState::Threaded => { + let (tx, rx) = channel::>(); + let content_callback = IncrementalFile::ThreadedReceiver(rx); + let chunk_submit = move |chunk: Vec| tx.send(chunk).is_ok(); + Ok((Box::new(chunk_submit), content_callback)) + } + IncrementalFileState::Immediate(ref state) => { + let content_callback = IncrementalFile::ImmediateReceiver; + let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?; + let chunk_submit = move |chunk: Vec| writer.chunk_submit(chunk); + Ok((Box::new(chunk_submit), content_callback)) + } + } + } } /// Trait object for performing IO. At this point the overhead @@ -122,7 +236,7 @@ pub trait Executor { /// During overload situations previously queued items may /// need to be completed before the item is accepted: /// consume the returned iterator. - fn execute(&self, mut item: Item) -> Box + '_> { + fn execute(&self, mut item: Item) -> Box + '_> { item.start = Some(Instant::now()); self.dispatch(item) } @@ -130,26 +244,35 @@ pub trait Executor { /// Actually dispatch a operation. /// This is called by the default execute() implementation and /// should not be called directly. - fn dispatch(&self, item: Item) -> Box + '_>; + fn dispatch(&self, item: Item) -> Box + '_>; /// Wrap up any pending operations and iterate over them. /// All operations submitted before the join will have been /// returned either through ready/complete or join once join /// returns. - fn join(&mut self) -> Box + '_>; + fn join(&mut self) -> Box + '_>; /// Iterate over completed items. - fn completed(&self) -> Box + '_>; + fn completed(&self) -> Box + '_>; + + /// Get any state needed for incremental file processing + fn incremental_file_state(&self) -> IncrementalFileState; } /// Trivial single threaded IO to be used from executors. /// (Crazy sophisticated ones can obviously ignore this) -pub fn perform(item: &mut Item) { +pub fn perform(item: &mut Item, chunk_complete_callback: F) { // directories: make them, TODO: register with the dir existence cache. // Files, write them. - item.result = match item.kind { + item.result = match &mut item.kind { Kind::Directory => create_dir(&item.full_path), Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode), + Kind::IncrementalFile(incremental_file) => write_file_incremental( + &item.full_path, + incremental_file, + item.mode, + chunk_complete_callback, + ), }; item.finish = item .start @@ -187,6 +310,51 @@ pub fn write_file, C: AsRef<[u8]>>( Ok(()) } +#[allow(unused_variables)] +pub fn write_file_incremental, F: Fn(usize)>( + path: P, + content_callback: &mut IncrementalFile, + mode: u32, + chunk_complete_callback: F, +) -> io::Result<()> { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + let mut f = { + trace_scoped!("creat", "name": path_display); + opts.write(true).create(true).truncate(true).open(path)? + }; + if let IncrementalFile::ThreadedReceiver(recv) = content_callback { + loop { + // We unwrap here because the documented only reason for recv to fail is a close by the sender, which is reading + // from the tar file: a failed read there will propogate the error in the main thread directly. + let contents = recv.recv().unwrap(); + let len = contents.len(); + // Length 0 vector is used for clean EOF signalling. + if len == 0 { + break; + } + { + trace_scoped!("write_segment", "name": path_display, "len": len); + f.write_all(&contents)?; + chunk_complete_callback(len); + } + } + } else { + unreachable!(); + } + { + trace_scoped!("close", "name:": path_display); + drop(f); + } + Ok(()) +} + pub fn create_dir>(path: P) -> io::Result<()> { let path = path.as_ref(); let path_display = format!("{}", path.display()); diff --git a/src/diskio/test.rs b/src/diskio/test.rs new file mode 100644 index 0000000000..53c437f62b --- /dev/null +++ b/src/diskio/test.rs @@ -0,0 +1,79 @@ +use std::collections::HashMap; + +use crate::errors::Result; +use crate::test::test_dir; + +use super::{get_executor, Executor, Item}; +use crate::currentprocess; + +fn test_incremental_file(io_threads: &str) -> Result<()> { + let work_dir = test_dir()?; + let mut vars = HashMap::new(); + vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string()); + let tp = Box::new(currentprocess::TestProcess { + vars, + ..Default::default() + }); + currentprocess::with(tp.clone(), || -> Result<()> { + let mut written = 0; + let mut file_finished = false; + let mut io_executor: Box = get_executor(None)?; + let (item, mut sender) = Item::write_file_segmented( + work_dir.path().join("scratch"), + 0o666, + io_executor.incremental_file_state(), + )?; + for _ in io_executor.execute(item).collect::>() { + // The file should be open and incomplete, and no completed chunks + unreachable!(); + } + let mut chunk: Vec = vec![]; + chunk.extend(b"0123456789"); + // We should be able to submit more than one chunk + sender(chunk.clone()); + sender(chunk.clone()); + loop { + for work in io_executor.completed().collect::>() { + match work { + super::CompletedIo::Chunk(size) => written += size, + super::CompletedIo::Item(item) => unreachable!(format!("{:?}", item)), + } + } + if written == 20 { + break; + } + } + // sending a zero length chunk closes the file + sender(vec![]); + loop { + for work in io_executor.completed().collect::>() { + match work { + super::CompletedIo::Chunk(_) => unreachable!(), + super::CompletedIo::Item(_) => { + file_finished = true; + } + } + } + if file_finished == true { + break; + } + } + assert_eq!(true, file_finished); + for _ in io_executor.join().collect::>() { + // no more work should be outstanding + unreachable!(); + } + Ok(()) + })?; + Ok(()) +} + +#[test] +fn test_incremental_file_immediate() -> Result<()> { + test_incremental_file("1") +} + +#[test] +fn test_incremental_file_threaded() -> Result<()> { + test_incremental_file("2") +} diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 8fb4ea586d..80039473ef 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -4,17 +4,17 @@ /// than desired. In particular the docs workload with 20K files requires /// very low latency per file, which even a few ms per syscall per file /// will cause minutes of wall clock time. -use super::{perform, Executor, Item}; -use crate::utils::notifications::Notification; -use crate::utils::units::Unit; - use std::cell::Cell; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; +use super::{perform, CompletedIo, Executor, Item}; +use crate::utils::notifications::Notification; +use crate::utils::units::Unit; + enum Task { - Request(Item), + Request(CompletedIo), // Used to synchronise in the join method. Sentinel, } @@ -59,27 +59,31 @@ impl<'a> Threaded<'a> { self.n_files.fetch_add(1, Ordering::Relaxed); let n_files = self.n_files.clone(); self.pool.execute(move || { - perform(&mut item); + let chunk_complete_callback = |size| { + tx.send(Task::Request(CompletedIo::Chunk(size))) + .expect("receiver should be listening") + }; + perform(&mut item, chunk_complete_callback); n_files.fetch_sub(1, Ordering::Relaxed); - tx.send(Task::Request(item)) + tx.send(Task::Request(CompletedIo::Item(item))) .expect("receiver should be listening"); }); } } impl<'a> Executor for Threaded<'a> { - fn dispatch(&self, item: Item) -> Box + '_> { + fn dispatch(&self, item: Item) -> Box + '_> { // Yield any completed work before accepting new work - keep memory // pressure under control // - return an iterator that runs until we can submit and then submits // as its last action Box::new(SubmitIterator { executor: self, - item: Cell::new(Task::Request(item)), + item: Cell::new(Some(item)), }) } - fn join(&mut self) -> Box + '_> { + fn join(&mut self) -> Box + '_> { // Some explanation is in order. Even though the tar we are reading from (if // any) will have had its FileWithProgress download tracking // completed before we hit drop, that is not true if we are unwinding due to a @@ -145,12 +149,16 @@ impl<'a> Executor for Threaded<'a> { }) } - fn completed(&self) -> Box + '_> { + fn completed(&self) -> Box + '_> { Box::new(JoinIterator { iter: self.rx.try_iter(), consume_sentinel: true, }) } + + fn incremental_file_state(&self) -> super::IncrementalFileState { + super::IncrementalFileState::Threaded + } } impl<'a> Drop for Threaded<'a> { @@ -166,9 +174,9 @@ struct JoinIterator> { } impl> Iterator for JoinIterator { - type Item = Item; + type Item = CompletedIo; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { let task_o = self.iter.next(); match task_o { None => None, @@ -188,13 +196,13 @@ impl> Iterator for JoinIterator { struct SubmitIterator<'a, 'b> { executor: &'a Threaded<'b>, - item: Cell, + item: Cell>, } impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { - type Item = Item; + type Item = CompletedIo; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { // The number here is arbitrary; just a number to stop exhausting fd's on linux // and still allow rapid decompression to generate work to dispatch // This function could perhaps be tuned: e.g. it may wait in rx.iter() @@ -203,7 +211,7 @@ impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { // actually completes; however, results are presently ok. let threshold = 5; if self.executor.pool.queued_count() < threshold { - if let Task::Request(item) = self.item.take() { + if let Some(item) = self.item.take() { self.executor.submit(item); }; None @@ -213,7 +221,7 @@ impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { return Some(item); } if self.executor.pool.queued_count() < threshold { - if let Task::Request(item) = self.item.take() { + if let Some(item) = self.item.take() { self.executor.submit(item); }; return None; diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 3888b5f876..d7cb5ace7b 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -10,7 +10,7 @@ use std::path::{Path, PathBuf}; use tar::EntryType; -use crate::diskio::{get_executor, Executor, Item, Kind}; +use crate::diskio::{get_executor, CompletedIo, Executor, Item, Kind}; use crate::dist::component::components::*; use crate::dist::component::transaction::*; use crate::dist::temp; @@ -165,7 +165,7 @@ struct MemoryBudget { // Probably this should live in diskio but ¯\_(ツ)_/¯ impl MemoryBudget { fn new( - max_file_size: usize, + io_chunk_size: usize, effective_max_ram: Option, notify_handler: Option<&dyn Fn(Notification<'_>)>, ) -> Self { @@ -195,30 +195,38 @@ impl MemoryBudget { } }; - // Future us: this can be removed when IO chunking within a single file is possible: it just helps generate good - // errors rather than allocator-failure panics when we hit the large file on a RAM limited system. - if max_file_size > unpack_ram { - panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size); + if io_chunk_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", io_chunk_size); } Self { limit: unpack_ram, used: 0, } } - fn reclaim(&mut self, op: &Item) { - match &op.kind { - Kind::Directory => {} - Kind::File(content) => self.used -= content.len(), - }; + + fn reclaim(&mut self, op: &CompletedIo) { + match &op { + CompletedIo::Item(op) => match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used -= content.len(), + Kind::IncrementalFile(_) => {} + }, + CompletedIo::Chunk(size) => self.used -= size, + } } fn claim(&mut self, op: &Item) { match &op.kind { Kind::Directory => {} Kind::File(content) => self.used += content.len(), + Kind::IncrementalFile(_) => {} }; } + fn claim_chunk(&mut self, len: usize) { + self.used += len; + } + fn available(&self) -> usize { self.limit - self.used } @@ -226,23 +234,27 @@ impl MemoryBudget { /// Handle the async result of io operations /// Replaces op.result with Ok(()) -fn filter_result(op: &mut Item) -> io::Result<()> { - let result = mem::replace(&mut op.result, Ok(())); - match result { - Ok(_) => Ok(()), - Err(e) => match e.kind() { - IOErrorKind::AlreadyExists => { - // mkdir of e.g. ~/.rustup already existing is just fine; - // for others it would be better to know whether it is - // expected to exist or not -so put a flag in the state. - if let Kind::Directory = op.kind { - Ok(()) - } else { - Err(e) +fn filter_result(op: &mut CompletedIo) -> io::Result<()> { + if let CompletedIo::Item(op) = op { + let result = mem::replace(&mut op.result, Ok(())); + match result { + Ok(_) => Ok(()), + Err(e) => match e.kind() { + IOErrorKind::AlreadyExists => { + // mkdir of e.g. ~/.rustup already existing is just fine; + // for others it would be better to know whether it is + // expected to exist or not -so put a flag in the state. + if let Kind::Directory = op.kind { + Ok(()) + } else { + Err(e) + } } - } - _ => Err(e), - }, + _ => Err(e), + }, + } + } else { + Ok(()) } } @@ -251,32 +263,35 @@ fn filter_result(op: &mut Item) -> io::Result<()> { /// /// Currently the volume of queued items does not count as backpressure against /// the main tar extraction process. +/// Returns the number of triggered children fn trigger_children( io_executor: &dyn Executor, directories: &mut HashMap, budget: &mut MemoryBudget, - item: Item, + op: CompletedIo, ) -> Result { let mut result = 0; - if let Kind::Directory = item.kind { - let mut pending = Vec::new(); - directories - .entry(item.full_path) - .and_modify(|status| match status { - DirStatus::Exists => unreachable!(), - DirStatus::Pending(pending_inner) => { - pending.append(pending_inner); - *status = DirStatus::Exists; + if let CompletedIo::Item(item) = op { + if let Kind::Directory = item.kind { + let mut pending = Vec::new(); + directories + .entry(item.full_path) + .and_modify(|status| match status { + DirStatus::Exists => unreachable!(), + DirStatus::Pending(pending_inner) => { + pending.append(pending_inner); + *status = DirStatus::Exists; + } + }) + .or_insert_with(|| unreachable!()); + result += pending.len(); + for pending_item in pending.into_iter() { + for mut item in io_executor.execute(pending_item).collect::>() { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + result += trigger_children(io_executor, directories, budget, item)?; } - }) - .or_insert_with(|| unreachable!()); - result += pending.len(); - for pending_item in pending.into_iter() { - for mut item in io_executor.execute(pending_item).collect::>() { - // TODO capture metrics - budget.reclaim(&item); - filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - result += trigger_children(io_executor, directories, budget, item)?; } } }; @@ -298,7 +313,7 @@ fn unpack_without_first_dir<'a, R: Read>( let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; - const MAX_FILE_SIZE: u64 = 220_000_000; + const IO_CHUNK_SIZE: u64 = 16_777_216; let effective_max_ram = match effective_limits::memory_limit() { Ok(ram) => Some(ram as usize), Err(e) => { @@ -308,7 +323,7 @@ fn unpack_without_first_dir<'a, R: Read>( None } }; - let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize, effective_max_ram, notify_handler); + let mut budget = MemoryBudget::new(IO_CHUNK_SIZE as usize, effective_max_ram, notify_handler); let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. @@ -349,36 +364,44 @@ fn unpack_without_first_dir<'a, R: Read>( continue; } - let size = entry.header().size()?; - if size > MAX_FILE_SIZE { - // If we cannot tell the user we will either succeed (great), or fail (and we may get a bug report), either - // way, we will most likely get reports from users about this, so the possible set of custom builds etc that - // don't report are not a great concern. - if let Some(notify_handler) = notify_handler { - notify_handler(Notification::Error(format!( - "File too big {} {}", - relpath.display(), - size - ))); - } - } - - fn flush_ios( + /// true if either no sender_entry was provided, or the incremental file + /// has been fully dispatched. + fn flush_ios<'a, R: std::io::Read, P: AsRef>( mut budget: &mut MemoryBudget, io_executor: &dyn Executor, mut directories: &mut HashMap, - ) -> Result<()> { - for mut item in io_executor.completed().collect::>() { + mut sender_entry: Option<&mut ( + Box) -> bool + 'a>, + &mut tar::Entry<'_, R>, + )>, + full_path: P, + ) -> Result { + let mut result = sender_entry.is_none(); + for mut op in io_executor.completed().collect::>() { // TODO capture metrics - budget.reclaim(&item); - filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&*io_executor, &mut directories, &mut budget, item)?; + budget.reclaim(&op); + filter_result(&mut op).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&*io_executor, &mut directories, &mut budget, op)?; } - Ok(()) - } - - while size > budget.available() as u64 { - flush_ios(&mut budget, &*io_executor, &mut directories)?; + // Maybe stream a file incrementally + if let Some((sender, entry)) = sender_entry.as_mut() { + if budget.available() as u64 >= IO_CHUNK_SIZE { + let mut v = vec![0; IO_CHUNK_SIZE as usize]; + let len = entry.read(&mut v)?; + if len == 0 { + result = true; + } + v.resize(len, 0); + budget.claim_chunk(len); + if !sender(v) { + return Err(ErrorKind::DisconnectedChannel( + full_path.as_ref().to_path_buf(), + ) + .into()); + } + } + } + Ok(result) } // Bail out if we get hard links, device nodes or any other unusual content @@ -409,15 +432,39 @@ fn unpack_without_first_dir<'a, R: Read>( let o_mode = g_mode >> 3; let mode = u_mode | g_mode | o_mode; + let file_size = entry.header().size()?; + let size = std::cmp::min(IO_CHUNK_SIZE, file_size); + + while size > budget.available() as u64 { + flush_ios::, _>( + &mut budget, + &*io_executor, + &mut directories, + None, + &full_path, + )?; + } + + let mut incremental_file_sender: Option) -> bool + '_>> = None; let mut item = match kind { EntryType::Directory => { directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new())); - Item::make_dir(full_path, mode) + Item::make_dir(full_path.clone(), mode) } EntryType::Regular => { - let mut v = Vec::with_capacity(size as usize); - entry.read_to_end(&mut v)?; - Item::write_file(full_path, v, mode) + if file_size > IO_CHUNK_SIZE { + let (item, sender) = Item::write_file_segmented( + full_path.clone(), + mode, + io_executor.incremental_file_state(), + )?; + incremental_file_sender = Some(sender); + item + } else { + let mut v = Vec::with_capacity(size as usize); + entry.read_to_end(&mut v)?; + Item::write_file(full_path.clone(), v, mode) + } } _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), }; @@ -456,12 +503,25 @@ fn unpack_without_first_dir<'a, R: Read>( } }; + // Submit the new item for mut item in io_executor.execute(item).collect::>() { // TODO capture metrics budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; trigger_children(&*io_executor, &mut directories, &mut budget, item)?; } + + let mut incremental_file_sender = incremental_file_sender + .map(|incremental_file_sender| (incremental_file_sender, &mut entry)); + + // monitor io queue and feed in the content of the file (if needed) + while !flush_ios( + &mut budget, + &*io_executor, + &mut directories, + incremental_file_sender.as_mut(), + &full_path, + )? {} } loop { diff --git a/src/dist/component/transaction.rs b/src/dist/component/transaction.rs index 45fe33c41d..8a70d03871 100644 --- a/src/dist/component/transaction.rs +++ b/src/dist/component/transaction.rs @@ -229,16 +229,12 @@ impl<'a> ChangedItem<'a> { } Ok(()) } - fn dest_abs_path( - prefix: &InstallPrefix, - component: &str, - relpath: &PathBuf, - ) -> Result { + fn dest_abs_path(prefix: &InstallPrefix, component: &str, relpath: &Path) -> Result { let abs_path = prefix.abs_path(relpath); if utils::path_exists(&abs_path) { Err(ErrorKind::ComponentConflict { name: component.to_owned(), - path: relpath.clone(), + path: relpath.to_path_buf(), } .into()) } else { diff --git a/src/dist/dist.rs b/src/dist/dist.rs index 453ea3b6d5..eba08ab200 100644 --- a/src/dist/dist.rs +++ b/src/dist/dist.rs @@ -407,13 +407,13 @@ impl<'a> Manifest<'a> { ext: &str, ) -> Result> { let suffix = target_triple.to_owned() + ext; - Ok(utils::match_file("manifest", &self.0, |line| { + utils::match_file("manifest", &self.0, |line| { if line.starts_with(package) && line.ends_with(&suffix) { Some(format!("{}/{}", &self.1, line)) } else { None } - })?) + }) } } diff --git a/src/dist/download.rs b/src/dist/download.rs index 48d5eb1b73..e584f6a38a 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -132,7 +132,7 @@ impl<'a> DownloadCfg<'a> { (self.notify_handler)(n.into()) })?; - Ok(utils::read_file("hash", &hash_file).map(|s| s[0..64].to_owned())?) + utils::read_file("hash", &hash_file).map(|s| s[0..64].to_owned()) } fn download_signature(&self, url: &str) -> Result { @@ -143,7 +143,7 @@ impl<'a> DownloadCfg<'a> { (self.notify_handler)(n.into()) })?; - Ok(utils::read_file("signature", &sig_file)?) + utils::read_file("signature", &sig_file) } fn check_signature(&self, url: &str, file: &temp::File<'_>) -> Result<&PgpPublicKey> { diff --git a/src/errors.rs b/src/errors.rs index 2e43e86b42..dafc69c9ed 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -106,6 +106,10 @@ error_chain! { description("could not download file") display("could not download file from '{}' to '{}'", url, path.display()) } + DisconnectedChannel (v: PathBuf) { + description("IO channel disconnected") + display("IO receiver for '{}' disconnected", v.display()) + } InvalidUrl { url: String, } { diff --git a/src/lib.rs b/src/lib.rs index 2c0d2e6900..55d8679753 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,9 @@ #![deny(rust_2018_idioms)] -#![allow(clippy::too_many_arguments)] +#![allow( + clippy::too_many_arguments, + clippy::type_complexity, + clippy::upper_case_acronyms // see https://github.com/rust-lang/rust-clippy/issues/6974 +)] #![recursion_limit = "1024"] pub use crate::config::*; diff --git a/src/toolchain.rs b/src/toolchain.rs index b2931b565c..2bc0bd0ec1 100644 --- a/src/toolchain.rs +++ b/src/toolchain.rs @@ -88,14 +88,14 @@ impl<'a> Toolchain<'a> { // Perform minimal validation; there should at least be a `bin/` that might // contain things for us to run. if !path.join("bin").is_dir() { - return Err(ErrorKind::InvalidToolchainPath(path.into()).into()); + return Err(ErrorKind::InvalidToolchainPath(path).into()); } Ok(Toolchain { cfg, name: utils::canonicalize_path(&path, cfg.notify_handler.as_ref()) .to_str() - .ok_or_else(|| ErrorKind::InvalidToolchainPath(path.clone().into()))? + .ok_or_else(|| ErrorKind::InvalidToolchainPath(path.clone()))? .to_owned(), path, dist_handler: Box::new(move |n| (cfg.notify_handler)(n.into())), @@ -610,7 +610,7 @@ impl<'a> DistributableToolchain<'a> { // Installed and not-installed? pub fn desc(&self) -> Result { - Ok(ToolchainDesc::from_str(&self.0.name)?) + ToolchainDesc::from_str(&self.0.name) } fn download_cfg(&self) -> DownloadCfg<'_> { @@ -917,7 +917,7 @@ impl<'a> DistributableToolchain<'a> { // Installed only. fn update_hash(&self) -> Result { - Ok(self.0.cfg.get_hash_file(&self.0.name, true)?) + self.0.cfg.get_hash_file(&self.0.name, true) } } diff --git a/src/utils/raw.rs b/src/utils/raw.rs index 0b20e253ee..6e330bae1c 100644 --- a/src/utils/raw.rs +++ b/src/utils/raw.rs @@ -177,6 +177,7 @@ fn symlink_junction_inner(target: &Path, junction: &Path) -> io::Result<()> { const MAXIMUM_REPARSE_DATA_BUFFER_SIZE: usize = 16 * 1024; #[repr(C)] + #[allow(non_snake_case)] pub struct REPARSE_MOUNTPOINT_DATA_BUFFER { pub ReparseTag: DWORD, pub ReparseDataLength: DWORD, diff --git a/src/utils/utils.rs b/src/utils/utils.rs index 2fc25901fc..74fc61262e 100644 --- a/src/utils/utils.rs +++ b/src/utils/utils.rs @@ -450,6 +450,7 @@ pub fn file_size(path: &Path) -> Result { } pub fn make_executable(path: &Path) -> Result<()> { + #[allow(clippy::unnecessary_wraps)] #[cfg(windows)] fn inner(_: &Path) -> Result<()> { Ok(())