Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream large files during unpacking #2707

Merged
merged 2 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cli/self_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ pub fn install(

fn rustc_or_cargo_exists_in_path() -> Result<()> {
// Ignore rustc and cargo if present in $HOME/.cargo/bin or a few other directories
#[allow(clippy::ptr_arg)]
fn ignore_paths(path: &PathBuf) -> bool {
!path
.components()
Expand Down
1 change: 1 addition & 0 deletions src/cli/self_update/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ fn _apply_new_path(new_path: Option<Vec<u16>>) -> Result<()> {
}

// Tell other processes to update their environment
#[allow(clippy::unnecessary_cast)]
unsafe {
SendMessageTimeoutA(
HWND_BROADCAST,
Expand Down
2 changes: 1 addition & 1 deletion src/cli/topical_doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn index_html(doc: &DocData<'_>, wpath: &Path) -> Option<PathBuf> {
}
}

fn dir_into_vec(dir: &PathBuf) -> Result<Vec<OsString>> {
fn dir_into_vec(dir: &Path) -> Result<Vec<OsString>> {
let entries = fs::read_dir(dir).chain_err(|| format!("Opening directory {:?}", dir))?;
let mut v = Vec::new();
for entry in entries {
Expand Down
7 changes: 3 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl<'a> OverrideCfg<'a> {
|| file.toolchain.components.is_some()
|| file.toolchain.profile.is_some()
{
return Err(ErrorKind::CannotSpecifyPathAndOptions(path.into()).into());
return Err(ErrorKind::CannotSpecifyPathAndOptions(path).into());
}
Some(Toolchain::from_path(cfg, cfg_path, &path)?)
}
(Some(channel), Some(path)) => {
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path.into()).into())
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path).into())
}
(None, None) => None,
},
Expand Down Expand Up @@ -172,8 +172,7 @@ impl PgpPublicKey {
Ok(ret)
}
use pgp::types::KeyTrait;
let mut ret = Vec::new();
ret.push(format!("from {}", self));
let mut ret = vec![format!("from {}", self)];
let key = self.key();
let keyid = format_hex(&key.key_id().to_vec(), "-", 4)?;
let algo = key.algorithm();
Expand Down
199 changes: 188 additions & 11 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,204 @@
///
/// Use for diagnosing bugs or working around any unexpected issues with the
/// threaded code paths.
use super::{perform, Executor, Item};
use std::{
fmt::Debug,
fs::{File, OpenOptions},
io::{self, Write},
path::Path,
sync::{Arc, Mutex},
time::Instant,
};

use super::{CompletedIo, Executor, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
completed_chunks: Vec<usize>,
err: Option<io::Result<()>>,
item: Option<Item>,
finished: bool,
}

pub(super) type IncrementalFileState = Arc<Mutex<Option<_IncrementalFileState>>>;

#[derive(Default, Debug)]
pub struct ImmediateUnpacker {
incremental_state: IncrementalFileState,
}

#[derive(Default)]
pub struct ImmediateUnpacker {}
impl ImmediateUnpacker {
pub fn new() -> Self {
Self {}
Self {
..Default::default()
}
}

fn deque(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
let mut guard = self.incremental_state.lock().unwrap();
// incremental file in progress
if let Some(ref mut state) = *guard {
// Case 1: pending errors
if state.finished {
let mut item = state.item.take().unwrap();
if state.err.is_some() {
let err = state.err.take().unwrap();
item.result = err;
}
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
if state.finished {
*guard = None;
}
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
// Case 2: pending chunks (which might be empty)
let mut completed_chunks = vec![];
completed_chunks.append(&mut state.completed_chunks);
Box::new(completed_chunks.into_iter().map(CompletedIo::Chunk))
}
} else {
Box::new(None.into_iter())
}
}
}

impl Executor for ImmediateUnpacker {
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = Item> + '_> {
perform(&mut item);
Box::new(Some(item).into_iter())
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
super::write_file(&item.full_path, &contents, item.mode)
}
super::Kind::IncrementalFile(_incremental_file) => {
return {
// If there is a pending error, return it, otherwise stash the
// Item for eventual return when the file is finished.
let mut guard = self.incremental_state.lock().unwrap();
if let Some(ref mut state) = *guard {
if state.err.is_some() {
let err = state.err.take().unwrap();
item.result = err;
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
*guard = None;
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
state.item = Some(item);
Box::new(None.into_iter())
}
} else {
unreachable!();
}
};
}
};
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
Box::new(Some(CompletedIo::Item(item)).into_iter())
}

fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn completed(&self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
fn incremental_file_state(&self) -> super::IncrementalFileState {
let mut state = self.incremental_state.lock().unwrap();
if state.is_some() {
unreachable!();
} else {
*state = Some(_IncrementalFileState {
completed_chunks: vec![],
err: None,
item: None,
finished: false,
});
super::IncrementalFileState::Immediate(self.incremental_state.clone())
}
}
}

/// The non-shared state for writing a file incrementally
#[derive(Debug)]
pub(super) struct IncrementalFileWriter {
state: IncrementalFileState,
file: Option<File>,
path_display: String,
}

impl IncrementalFileWriter {
#[allow(unused_variables)]
pub fn new<P: AsRef<Path>>(
path: P,
mode: u32,
state: IncrementalFileState,
) -> std::result::Result<Self, io::Error> {
let mut opts = OpenOptions::new();
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opts.mode(mode);
}
let path = path.as_ref();
let path_display = format!("{}", path.display());
let file = Some({
trace_scoped!("creat", "name": path_display);
opts.write(true).create(true).truncate(true).open(path)?
});
Ok(IncrementalFileWriter {
file,
state,
path_display,
})
}

pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
match self.write(chunk) {
Ok(v) => v,
Err(e) => {
let mut state = self.state.lock().unwrap();
if let Some(ref mut state) = *state {
state.err.replace(Err(e));
state.finished = true;
false
} else {
false
}
}
}
}

fn write(&mut self, chunk: Vec<u8>) -> std::result::Result<bool, io::Error> {
let mut state = self.state.lock().unwrap();
if let Some(ref mut state) = *state {
if let Some(ref mut file) = (&mut self.file).as_mut() {
// Length 0 vector is used for clean EOF signalling.
if chunk.is_empty() {
trace_scoped!("close", "name:": self.path_display);
drop(std::mem::take(&mut self.file));
state.finished = true;
} else {
trace_scoped!("write_segment", "name": self.path_display, "len": chunk.len());
file.write_all(&chunk)?;

state.completed_chunks.push(chunk.len());
}
Ok(true)
} else {
Ok(false)
}
} else {
unreachable!();
}
}
}
Loading