diff --git a/Cargo.toml b/Cargo.toml index bbd1f3fe7..b58896f77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,9 @@ lzma-rs = { version = "0.3", default-features = false, optional = true } [target.'cfg(any(all(target_arch = "arm", target_pointer_width = "32"), target_arch = "mips", target_arch = "powerpc"))'.dependencies] crossbeam-utils = "0.8.20" +[target.'cfg(unix)'.dependencies] +libc = { version = "0.2.155", optional = true } + [target.'cfg(fuzzing)'.dependencies] arbitrary = { version = "1.3.2", features = ["derive"] } @@ -62,7 +65,8 @@ walkdir = "2.5" time = { workspace = true, features = ["formatting", "macros"] } anyhow = "1.0.60" clap = { version = "=4.4.18", features = ["derive"] } -tempfile = "3.8" +tempdir = "0.3.7" +tempfile = "3.10.1" [features] aes-crypto = ["aes", "constant_time_eq", "hmac", "pbkdf2", "sha1", "rand", "zeroize"] @@ -79,6 +83,7 @@ deflate-zopfli = ["zopfli", "_deflate-any"] lzma = ["lzma-rs/stream"] unreserved = [] xz = ["lzma-rs/raw_decoder"] +parallelism = ["libc"] default = [ "aes-crypto", "bzip2", @@ -101,3 +106,7 @@ harness = false [[bench]] name = "merge_archive" harness = false + +[[bench]] +name = "extract" +harness = false diff --git a/benches/extract.rs b/benches/extract.rs new file mode 100755 index 000000000..b36fd362c --- /dev/null +++ b/benches/extract.rs @@ -0,0 +1,86 @@ +use bencher::{benchmark_group, benchmark_main}; + +use bencher::Bencher; +use tempdir::TempDir; + +use std::fs; +use std::path::Path; + +use zip::result::ZipResult; +use zip::ZipArchive; + +#[cfg(all(feature = "parallelism", feature = "bzip2", unix))] +use zip::read::{split_extract, ExtractionParameters}; + +/* This archive has a set of entries repeated 20x: + * - 200K random data, stored uncompressed (CompressionMethod::Stored) + * - 246K text data (the project gutenberg html version of king lear) + * (CompressionMethod::Bzip2, compression level 1) (project gutenberg ebooks are public domain) + * + * The full archive file is 5.3MB. + */ +fn get_test_archive() -> ZipResult> { + let path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/data/stored-and-compressed-text.zip"); + let file = fs::File::open(path)?; + ZipArchive::new(file) +} + +fn extract_basic(bench: &mut Bencher) { + let mut readable_archive = get_test_archive().unwrap(); + let total_size: u64 = readable_archive + .decompressed_size() + .unwrap() + .try_into() + .unwrap(); + + let parent = TempDir::new("zip-extract").unwrap(); + + bench.bytes = total_size; + bench.bench_n(1, |bench| { + bench.iter(move || { + let outdir = TempDir::new_in(parent.path(), "bench-subdir") + .unwrap() + .into_path(); + readable_archive.extract(outdir).unwrap(); + }); + }); +} + +#[cfg(all(feature = "parallelism", feature = "bzip2", unix))] +const DECOMPRESSION_THREADS: usize = 8; + +#[cfg(all(feature = "parallelism", feature = "bzip2", unix))] +fn extract_split(bench: &mut Bencher) { + let readable_archive = get_test_archive().unwrap(); + let total_size: u64 = readable_archive + .decompressed_size() + .unwrap() + .try_into() + .unwrap(); + + let params = ExtractionParameters { + decompression_threads: DECOMPRESSION_THREADS, + ..Default::default() + }; + + let parent = TempDir::new("zip-extract").unwrap(); + + bench.bytes = total_size; + bench.bench_n(1, |bench| { + bench.iter(move || { + let outdir = TempDir::new_in(parent.path(), "bench-subdir") + .unwrap() + .into_path(); + split_extract(&readable_archive, &outdir, params.clone()).unwrap(); + }); + }); +} + +#[cfg(not(all(feature = "parallelism", feature = "bzip2", unix)))] +benchmark_group!(benches, extract_basic); + +#[cfg(all(feature = "parallelism", feature = "bzip2", unix))] +benchmark_group!(benches, extract_basic, extract_split); + +benchmark_main!(benches); diff --git a/src/read.rs b/src/read.rs index 39686012d..0b6d1fe02 100644 --- a/src/read.rs +++ b/src/read.rs @@ -40,6 +40,13 @@ pub(crate) mod xz; pub(crate) mod magic_finder; +#[cfg(feature = "parallelism")] +pub(crate) mod pipelining; +#[cfg(all(unix, feature = "parallelism"))] +pub use pipelining::split_extraction::{split_extract, ExtractionParameters, SplitExtractionError}; +#[cfg(feature = "parallelism")] +pub(crate) mod split; + // Put the struct declaration in a private module to convince rustdoc to display ZipArchive nicely pub(crate) mod zip_archive { use indexmap::IndexMap; diff --git a/src/read/pipelining.rs b/src/read/pipelining.rs new file mode 100644 index 000000000..1f4bb7fc4 --- /dev/null +++ b/src/read/pipelining.rs @@ -0,0 +1,1209 @@ +//! Pipelined extraction into a filesystem directory. + +#![allow(clippy::needless_lifetimes)] +#![allow(unknown_lints)] +#![allow(non_local_definitions)] +#![cfg_attr(not(unix), allow(dead_code))] + +pub mod path_splitting { + use displaydoc::Display; + use thiserror::Error; + + use std::collections::BTreeMap; + + use crate::spec::is_dir; + + /// Errors encountered during path splitting. + #[derive(Debug, Display, Error)] + pub enum PathSplitError<'a> { + /// entry path {0:?} would escape extraction dir: {0:?} + ExtractionPathEscapesDirectory(&'a str, &'static str), + /// duplicate entry path {0:?} used: {0:?} + DuplicatePath(&'a str, &'static str), + } + + /* NB: path_to_string() performs some of this logic, but is intended to coerce filesystems + * paths into zip entry names, whereas we are processing entry names from a zip archive, so we + * perform much less error handling. */ + pub(crate) fn normalize_parent_dirs<'a>( + entry_path: &'a str, + ) -> Result<(Vec<&'a str>, bool), PathSplitError<'a>> { + /* The ZIP spec states (APPNOTE 4.4.17) that file paths are in Unix format, and Unix + * filesystems treat a backslash as a normal character. Thus they should be allowed on Unix + * and replaced with \u{fffd} on Windows. */ + if entry_path.starts_with('/') { + return Err(PathSplitError::ExtractionPathEscapesDirectory( + entry_path, + "path began with '/' and is absolute", + )); + } + let is_dir = is_dir(entry_path); + + let mut ret: Vec<&'a str> = Vec::new(); + for component in entry_path.split('/') { + match component { + /* Skip over repeated separators "//". We check separately for ending '/' with the + * `is_dir` variable. */ + "" => (), + /* Skip over redundant "." separators. */ + "." => (), + /* If ".." is present, pop off the last element or return an error. */ + ".." => { + if ret.pop().is_none() { + return Err(PathSplitError::ExtractionPathEscapesDirectory( + entry_path, + "path has too many '..' components and would escape the containing dir", + )); + } + } + _ => { + ret.push(component); + } + } + } + + Ok((ret, is_dir)) + } + + fn split_dir_file_components<'a, 's>( + all_components: &'s [&'a str], + is_dir: bool, + ) -> (&'s [&'a str], Option<&'a str>) { + if is_dir { + (all_components, None) + } else { + let (last, rest) = all_components.split_last().unwrap(); + (rest, Some(last)) + } + } + + #[derive(PartialEq, Eq, Debug, Clone)] + pub(crate) struct DirEntry<'a, Data> { + pub properties: Option, + pub children: BTreeMap<&'a str, Box>>, + } + + impl<'a, Data> Default for DirEntry<'a, Data> { + fn default() -> Self { + Self { + properties: None, + children: BTreeMap::new(), + } + } + } + + #[derive(PartialEq, Eq, Debug, Clone)] + pub(crate) enum FSEntry<'a, Data> { + Dir(DirEntry<'a, Data>), + File(Data), + } + + pub(crate) trait DirByMode { + fn is_dir_by_mode(&self) -> bool; + } + + impl DirByMode for &crate::types::ZipFileData { + fn is_dir_by_mode(&self) -> bool { + crate::types::ZipFileData::is_dir_by_mode(self) + } + } + + /* This returns a BTreeMap and not a DirEntry because we do not allow setting permissions or + * any other data for the top-level extraction directory. */ + pub(crate) fn lexicographic_entry_trie<'a, Data>( + all_entries: impl IntoIterator, + ) -> Result>>, PathSplitError<'a>> + where + Data: DirByMode, + { + let mut base_dir: DirEntry<'a, Data> = DirEntry::default(); + + for (entry_path, data) in all_entries { + /* Begin at the top-level directory. We will recurse downwards. */ + let mut cur_dir = &mut base_dir; + + /* Split entries by directory components, and normalize any non-literal paths + * (e.g. '..', '.', leading '/', repeated '/'). */ + let (all_components, is_dir) = normalize_parent_dirs(entry_path)?; + + /* If the entry resolves to the top-level directory, we don't error, but instead just + * avoid writing any data to that directory entry. */ + if all_components.is_empty() { + continue; + } + + /* If the entry is a directory by mode, then it does not need to end in '/'. */ + let is_dir = is_dir || data.is_dir_by_mode(); + /* Split basename and dirname. */ + let (dir_components, file_component) = + split_dir_file_components(&all_components, is_dir); + + for component in dir_components.iter() { + let next_subdir = cur_dir + .children + .entry(component) + .or_insert_with(|| Box::new(FSEntry::Dir(DirEntry::default()))); + cur_dir = match next_subdir.as_mut() { + FSEntry::File(_) => { + return Err(PathSplitError::DuplicatePath( + entry_path, + "a file was already registered at the same path as this dir entry", + )); + } + FSEntry::Dir(ref mut subdir) => subdir, + } + } + match file_component { + Some(filename) => { + /* We can't handle duplicate file paths, as that might mess up our + * parallelization strategy. */ + if cur_dir.children.contains_key(filename) { + return Err(PathSplitError::DuplicatePath( + entry_path, + "an entry was already registered at the same path as this file entry", + )); + } + cur_dir + .children + .insert(filename, Box::new(FSEntry::File(data))); + } + None => { + /* We can't handle duplicate directory entries for the exact same normalized + * path, as it's not clear how to merge the possibility of two separate file + * permissions. */ + if cur_dir.properties.replace(data).is_some() { + return Err(PathSplitError::DuplicatePath( + entry_path, + "another directory was already registered at this path", + )); + } + } + } + } + + let DirEntry { + properties, + children, + } = base_dir; + debug_assert!(properties.is_none(), "setting metadata on the top-level extraction dir is not allowed and should have been filtered out"); + Ok(children) + } + + /* TODO: use proptest for all of this! */ + #[cfg(test)] + mod test { + use super::*; + + #[test] + fn path_normalization() { + assert_eq!( + normalize_parent_dirs("a/b/c").unwrap(), + (vec!["a", "b", "c"], false) + ); + assert_eq!(normalize_parent_dirs("./a").unwrap(), (vec!["a"], false)); + assert_eq!(normalize_parent_dirs("a/../b/").unwrap(), (vec!["b"], true)); + assert_eq!(normalize_parent_dirs("a/").unwrap(), (vec!["a"], true)); + assert!(normalize_parent_dirs("/a").is_err()); + assert_eq!(normalize_parent_dirs("\\a").unwrap(), (vec!["\\a"], false)); + assert_eq!( + normalize_parent_dirs("a\\b/").unwrap(), + (vec!["a\\b"], true) + ); + assert!(normalize_parent_dirs("a/../../b").is_err()); + assert_eq!(normalize_parent_dirs("./").unwrap(), (vec![], true)); + } + + #[test] + fn split_dir_file() { + assert_eq!( + split_dir_file_components(&["a", "b", "c"], true), + (["a", "b", "c"].as_ref(), None) + ); + assert_eq!( + split_dir_file_components(&["a", "b", "c"], false), + (["a", "b"].as_ref(), Some("c")) + ); + } + + impl DirByMode for usize { + fn is_dir_by_mode(&self) -> bool { + false + } + } + + #[test] + fn lex_trie() { + assert_eq!( + lexicographic_entry_trie([ + ("a/b/", 1usize), + ("a/", 2), + ("a/b/c", 3), + ("d/", 4), + ("e", 5), + ("a/b/f/g", 6), + ]) + .unwrap(), + [ + ( + "a", + FSEntry::Dir(DirEntry { + properties: Some(2), + children: [( + "b", + FSEntry::Dir(DirEntry { + properties: Some(1), + children: [ + ("c", FSEntry::File(3).into()), + ( + "f", + FSEntry::Dir(DirEntry { + properties: None, + children: [("g", FSEntry::File(6).into())] + .into_iter() + .collect(), + }) + .into() + ), + ] + .into_iter() + .collect(), + }) + .into() + )] + .into_iter() + .collect(), + }) + .into() + ), + ( + "d", + FSEntry::Dir(DirEntry { + properties: Some(4), + children: BTreeMap::new(), + }) + .into() + ), + ("e", FSEntry::File(5).into()) + ] + .into_iter() + .collect() + ); + } + + #[test] + fn lex_trie_dir_by_mode() { + #[derive(PartialEq, Eq, Debug)] + struct Mode(usize, bool); + + impl DirByMode for Mode { + fn is_dir_by_mode(&self) -> bool { + self.1 + } + } + + assert_eq!( + lexicographic_entry_trie([ + ("a/b", Mode(1, true)), + ("a/", Mode(2, false)), + ("a/b/c", Mode(3, false)), + ("d", Mode(4, true)), + ("e", Mode(5, false)), + ("a/b/f/g", Mode(6, false)), + ]) + .unwrap(), + [ + ( + "a", + FSEntry::Dir(DirEntry { + properties: Some(Mode(2, false)), + children: [( + "b", + FSEntry::Dir(DirEntry { + properties: Some(Mode(1, true)), + children: [ + ("c", FSEntry::File(Mode(3, false)).into()), + ( + "f", + FSEntry::Dir(DirEntry { + properties: None, + children: [( + "g", + FSEntry::File(Mode(6, false)).into() + )] + .into_iter() + .collect(), + }) + .into() + ), + ] + .into_iter() + .collect(), + }) + .into() + )] + .into_iter() + .collect(), + }) + .into() + ), + ( + "d", + FSEntry::Dir(DirEntry { + properties: Some(Mode(4, true)), + children: BTreeMap::new(), + }) + .into() + ), + ("e", FSEntry::File(Mode(5, false)).into()) + ] + .into_iter() + .collect() + ); + } + } +} + +pub mod handle_creation { + use displaydoc::Display; + use thiserror::Error; + + use std::cmp; + use std::collections::{HashMap, VecDeque}; + use std::fs; + use std::hash; + use std::io; + use std::path::{Path, PathBuf}; + + use crate::types::ZipFileData; + + use super::path_splitting::{DirEntry, FSEntry}; + + /// Errors encountered when creating output handles for extracting entries to. + #[derive(Debug, Display, Error)] + pub enum HandleCreationError { + /// i/o error: {0} + Io(#[from] io::Error), + } + + /// Wrapper for memory location of the ZipFileData in Shared. + /// + /// Enables quick comparison and hash table lookup without needing to implement Hash/Eq for + /// ZipFileData more generally. + #[derive(Debug)] + pub(crate) struct ZipDataHandle<'a>(&'a ZipFileData); + + impl<'a> ZipDataHandle<'a> { + #[inline(always)] + const fn ptr(&self) -> *const ZipFileData { + self.0 + } + + #[inline(always)] + pub const fn wrap(data: &'a ZipFileData) -> Self { + Self(data) + } + } + + impl<'a> cmp::PartialEq for ZipDataHandle<'a> { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + self.ptr() == other.ptr() + } + } + + impl<'a> cmp::Eq for ZipDataHandle<'a> {} + + impl<'a> hash::Hash for ZipDataHandle<'a> { + #[inline(always)] + fn hash(&self, state: &mut H) { + self.ptr().hash(state); + } + } + + /* TODO: figure out how to handle symlinks! These are especially difficult because: + * (1) windows symlinks files and directories differently, and only on newer windows versions, + * (2) later entries in the zip may refer to symlink paths from earlier in the zip. + * + * Of these issues, (2) is more difficult and intrinsic to the problem space. In order to + * correctly extract symlinks in a pipelined/parallel fashion, we need to identify these order + * dependencies and schedule the symlink dereference (reading the target value from the zip) + * before we create any directories or allocate any output file handles that dereference that + * symlink. This is less of a problem with the synchronous in-order extraction because it + * creates any symlinks immediately (it imposes a total ordering dependency over all entries). + */ + pub(crate) struct AllocatedHandles<'a> { + pub file_handle_mapping: HashMap, fs::File>, + pub perms_todo: Vec<(PathBuf, fs::Permissions)>, + } + + pub(crate) fn transform_entries_to_allocated_handles<'a>( + top_level_extraction_dir: &Path, + lex_entry_trie: impl IntoIterator>)>, + ) -> Result, HandleCreationError> { + #[cfg(unix)] + use std::os::unix::fs::PermissionsExt; + + /* TODO: we create subdirs by constructing path strings, which may fail at overlarge + * paths. This may be fixable on unix with mkdirat()/openat(), but would require more + * complex platform-specific programming. However, the result would likely decrease the + * number of syscalls, which may also improve performance. It may also be slightly easier to + * follow the logic if we can refer to directory inodes instead of constructing path strings + * as a proxy. This should be considered if requested by users. */ + fs::create_dir_all(top_level_extraction_dir)?; + + #[allow(clippy::mutable_key_type)] + let mut file_handle_mapping: HashMap, fs::File> = HashMap::new(); + let mut entry_queue: VecDeque<(PathBuf, Box>)> = + lex_entry_trie + .into_iter() + .map(|(entry_name, entry_data)| { + (top_level_extraction_dir.join(entry_name), entry_data) + }) + .collect(); + let mut perms_todo: Vec<(PathBuf, fs::Permissions)> = Vec::new(); + + while let Some((path, entry)) = entry_queue.pop_front() { + match *entry { + FSEntry::File(data) => { + let key = ZipDataHandle::wrap(data); + + #[cfg_attr(not(unix), allow(unused_variables))] + if let Some(mode) = data.unix_mode() { + /* TODO: consider handling the readonly bit on windows. We don't currently + * do this in normal extraction, so we don't need to do this yet for + * pipelining. */ + + /* Write the desired perms to the perms queue. */ + #[cfg(unix)] + perms_todo.push((path.clone(), fs::Permissions::from_mode(mode))); + } + + let handle = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + assert!(file_handle_mapping.insert(key, handle).is_none()); + } + #[cfg_attr(not(unix), allow(unused_variables))] + FSEntry::Dir(DirEntry { + properties, + children, + }) => { + /* FIXME: use something like make_writable_dir_all() and then add to + * perms_todo! */ + match fs::create_dir(&path) { + Err(e) if e.kind() == io::ErrorKind::AlreadyExists => (), + Err(e) => return Err(e.into()), + Ok(()) => (), + } + + /* (1) Write any desired perms to the perms queue. */ + #[cfg(unix)] + if let Some(perms_to_set) = properties.and_then(|data| data.unix_mode()) { + perms_todo.push((path.clone(), fs::Permissions::from_mode(perms_to_set))); + } + /* (2) Generate sub-entries by constructing full paths. */ + for (sub_name, entry) in children.into_iter() { + let full_name = path.join(sub_name); + entry_queue.push_back((full_name, entry)); + } + } + } + } + + /* NB: Iterate in *REVERSE* so that child directories are set before parents! Setting + * a parent readonly would stop us from setting child perms. */ + perms_todo.reverse(); + + Ok(AllocatedHandles { + file_handle_mapping, + perms_todo, + }) + } + + #[cfg(test)] + mod test { + use tempdir::TempDir; + + use std::io::{prelude::*, Cursor}; + + use crate::write::{SimpleFileOptions, ZipWriter}; + + use super::super::path_splitting::lexicographic_entry_trie; + use super::*; + + #[test] + fn subdir_creation() { + #[cfg(unix)] + use std::os::unix::fs::PermissionsExt; + + /* Create test archive. */ + let mut zip = ZipWriter::new(Cursor::new(Vec::new())); + let opts = SimpleFileOptions::default(); + + zip.start_file("a/b/c", opts.unix_permissions(0o765)) + .unwrap(); + zip.write_all(b"asdf").unwrap(); + + zip.add_directory("a/b", opts.unix_permissions(0o500)) + .unwrap(); + + /* Create readable archive and extraction dir. */ + let zip = zip.finish_into_readable().unwrap(); + let td = TempDir::new("pipeline-test").unwrap(); + + /* (1) Create lex entry trie. */ + let trie = lexicographic_entry_trie( + zip.shared + .files + .iter() + .map(|(name, data)| (name.as_ref(), data)), + ) + .unwrap(); + + /* (2) Generate handles. */ + #[cfg_attr(not(unix), allow(unused_variables))] + let AllocatedHandles { + file_handle_mapping, + perms_todo, + } = transform_entries_to_allocated_handles(td.path(), trie).unwrap(); + + let mut files: Vec<_> = file_handle_mapping.into_iter().collect(); + assert_eq!(1, files.len()); + let (file_data, mut file) = files.pop().unwrap(); + assert_eq!( + file_data, + ZipDataHandle::wrap(zip.shared.files.get_index(0).unwrap().1) + ); + /* We didn't write anything to the file, so it's still empty at this point. */ + assert_eq!(b"", &fs::read(td.path().join("a/b/c")).unwrap()[..]); + file.write_all(b"asdf").unwrap(); + file.sync_data().unwrap(); + /* Now the data is synced! */ + assert_eq!(b"asdf", &fs::read(td.path().join("a/b/c")).unwrap()[..]); + + #[cfg(unix)] + assert_eq!( + perms_todo + .into_iter() + .map(|(path, perms)| (path, perms.mode() & 0o777)) + .collect::>(), + vec![ + (td.path().join("a/b/c"), 0o765), + (td.path().join("a/b"), 0o500), + ] + ); + } + } +} + +#[cfg(unix)] +pub mod split_extraction { + use displaydoc::Display; + use thiserror::Error; + + use std::fs; + use std::io; + use std::mem::{self, MaybeUninit}; + use std::path::Path; + use std::sync::mpsc; + use std::thread; + + use crate::compression::CompressionMethod; + use crate::read::ZipArchive; + use crate::read::{make_crypto_reader, make_reader}; + use crate::result::ZipError; + use crate::spec::FixedSizeBlock; + use crate::types::{ZipFileData, ZipLocalEntryBlock}; + + #[cfg(not(target_os = "linux"))] + use crate::read::split::pipe::unix::{PipeReadBufferSplicer, PipeWriteBufferSplicer}; + #[cfg(target_os = "linux")] + use crate::read::split::{ + file::linux::FileCopy, + pipe::linux::{PipeReadSplicer, PipeWriteSplicer}, + }; + use crate::read::split::{ + file::{ + unix::{FileBufferCopy, FileInput, FileOutput}, + CopyRange, InputFile, + }, + pipe::{unix::create_pipe, ReadSplicer, WriteSplicer}, + util::{copy_via_buf, TakeWrite}, + }; + + use super::{ + handle_creation::{ + transform_entries_to_allocated_handles, AllocatedHandles, HandleCreationError, + ZipDataHandle, + }, + path_splitting::{lexicographic_entry_trie, PathSplitError}, + }; + + /// Errors encountered during the split pipelined extraction process. + #[derive(Debug, Display, Error)] + pub enum SplitExtractionError { + /// i/o error: {0} + Io(#[from] io::Error), + /// zip error: {0} + Zip(#[from] ZipError), + /// path split error: {0} + PathSplit(String), + /// handle creation error: {0} + HandleCreation(#[from] HandleCreationError), + } + + impl<'a> From> for SplitExtractionError { + fn from(e: PathSplitError<'a>) -> Self { + let msg = format!("{}", e); + Self::PathSplit(msg) + } + } + + /* TODO: make this share code with find_data_start()! */ + fn get_or_find_data_start(data: &ZipFileData, input_file: InF) -> Result + where + InF: InputFile, + { + // TODO: use .get_or_try_init() once stabilized to provide a closure returning a Result! + if let Some(data_start) = data.data_start.get() { + return Ok(*data_start); + } + + let block = { + let block: MaybeUninit<[u8; mem::size_of::()]> = + MaybeUninit::uninit(); + let mut block: [MaybeUninit; mem::size_of::()] = + unsafe { mem::transmute(block) }; + + input_file.pread_all(data.header_start, &mut block[..])?; + + let block: MaybeUninit<[u8; mem::size_of::()]> = + unsafe { mem::transmute(block) }; + unsafe { block.assume_init() } + }; + // Parse static-sized fields and check the magic value. + let block = ZipLocalEntryBlock::interpret(block.as_ref())?; + + // Calculate the end of the local header from the fields we just parsed. + let variable_fields_len: u64 = + // Each of these fields must be converted to u64 before adding, as the result may + // easily overflow a u16. + block.file_name_length as u64 + block.extra_field_length as u64; + let local_entry_block_size: u64 = mem::size_of::().try_into().unwrap(); + let data_start: u64 = data.header_start + local_entry_block_size + variable_fields_len; + + // Set the value so we don't have to read it again. + match data.data_start.set(data_start) { + Ok(()) => (), + // If the value was already set in the meantime, ensure it matches. + Err(_) => { + assert_eq!(*data.data_start.get().unwrap(), data_start); + } + } + Ok(data_start) + } + + /// Parameters to control the degree of parallelism used for extraction. + #[derive(Debug, Clone)] + pub struct ExtractionParameters { + /// Number of threads used for decompression. + /// + /// Default value: 4. + /// + /// Note that multiple times this many threads will be spawned by [`split_extract()`] as + /// part of the pipelined process. Only this many threads will be used to perform + /// decompression in rust code, but other threads will be used to wait on I/O from + /// the kernel. + pub decompression_threads: usize, + /// Size of buffer used to copy a decompressed entry into the corresponding output pipe. + /// + /// Default value: 1MB. + pub decompression_copy_buffer_length: usize, + /// Size of buffer used to copy stored entries into the output file. + /// + /// Used on non-Linux platforms without + /// [`copy_file_range()`](https://www.gnu.org/software/libc/manual/html_node/Copying-File-Data.html), + /// as well as on Linux when the input and output file handles are on separate devices. + /// + /// Default value: 1MB. + pub file_range_copy_buffer_length: usize, + /// Size of buffer used to splice contents from a pipe into an output file handle. + /// + /// Used on non-Linux platforms without [`splice()`](https://en.wikipedia.org/wiki/Splice_(system_call)). + /// + /// Default value: 1MB. + #[cfg(not(target_os = "linux"))] + pub splice_read_buffer_length: usize, + /// Size of buffer used to splice contents from an input file handle into a pipe. + /// + /// Used on non-Linux platforms without [`splice()`](https://en.wikipedia.org/wiki/Splice_(system_call)). + /// + /// Default value: 1MB. + #[cfg(not(target_os = "linux"))] + pub splice_write_buffer_length: usize, + } + + impl Default for ExtractionParameters { + fn default() -> Self { + Self { + decompression_threads: 4, + decompression_copy_buffer_length: 1024 * 1024, + file_range_copy_buffer_length: 1024 * 1024, + #[cfg(not(target_os = "linux"))] + splice_read_buffer_length: 1024 * 1024, + #[cfg(not(target_os = "linux"))] + splice_write_buffer_length: 1024 * 1024, + } + } + } + + fn wrap_spawn_err<'scope>( + err_sender: mpsc::Sender, + f: impl FnOnce() -> Result<(), SplitExtractionError> + Send + 'scope, + ) -> impl FnOnce() + Send + 'scope { + move || match f() { + Ok(()) => (), + #[allow(clippy::single_match)] + Err(e) => match err_sender.send(e) { + Ok(()) => (), + /* We use an async sender, so this should only error if the receiver has hung + * up, which occurs when we return a previous error from the main thread. */ + Err(mpsc::SendError(_)) => (), + }, + } + } + + /// Extract all entries in parallel using a pipelined strategy. + pub fn split_extract( + archive: &ZipArchive, + top_level_extraction_dir: &Path, + params: ExtractionParameters, + ) -> Result<(), SplitExtractionError> { + let ZipArchive { + reader: ref input_file, + ref shared, + .. + } = archive; + let ExtractionParameters { + decompression_threads, + decompression_copy_buffer_length, + file_range_copy_buffer_length, + #[cfg(not(target_os = "linux"))] + splice_read_buffer_length, + #[cfg(not(target_os = "linux"))] + splice_write_buffer_length, + } = params; + + /* (1) Create lex entry trie. */ + let trie = lexicographic_entry_trie( + shared + .files + .iter() + .map(|(name, data)| (name.as_ref(), data)), + )?; + /* (2) Generate handles. */ + let AllocatedHandles { + file_handle_mapping, + perms_todo, + } = transform_entries_to_allocated_handles(top_level_extraction_dir, trie)?; + + /* (3) Create a wrapper over the input file which uses pread() to read from multiple + * sections in parallel across a thread pool. */ + let input_file = FileInput::new(input_file)?; + + thread::scope(move |scope| { + /* (4) Create n parallel consumer pipelines. Threads are spawned into the scope, so + * panics get propagated automatically, and all threads are joined at the end of the + * scope. wrap_spawn_err() is used to enable thread closures to return a Result and + * asynchronously propagate the error back up to the main scope thread. */ + let (err_sender, err_receiver) = mpsc::channel::(); + /* This channel is used to notify the zip-input-reader thread when a consumer has + * completed decompressing/copying an entry, and is ready to receive new input. This is + * neither round-robin nor LRU: no thread is prioritized over any other, and new + * entries are sent off to workers in order of when they notify the zip-input-reader + * thread of their readiness. */ + let (queue_sender, queue_receiver) = mpsc::channel::(); + let input_writer_infos: Vec> = (0 + ..decompression_threads) + .map(|consumer_index| { + /* Create pipes to write entries through. */ + let (compressed_read_end, compressed_write_end) = create_pipe()?; + let (uncompressed_read_end, uncompressed_write_end) = create_pipe()?; + + /* Create channels to send entries through. */ + let (read_send, read_recv) = mpsc::channel::<(&ZipFileData, u64, FileOutput)>(); + let (compressed_sender, compressed_receiver) = + mpsc::channel::<(&ZipFileData, FileOutput)>(); + let (uncompressed_sender, uncompressed_receiver) = + mpsc::channel::<(&ZipFileData, FileOutput)>(); + + /* Send this consumer pipeline's index to the zip-input-reader thread when it's + * ready to receive new input. */ + let queue_sender = queue_sender.clone(); + #[allow(clippy::single_match)] + let notify_readiness = move || match queue_sender.send(consumer_index) { + Ok(()) => (), + /* Disconnected; this is expected to occur at the end of extraction. */ + Err(mpsc::SendError(_)) => (), + }; + + /* (8) Write decompressed entries to the preallocated output handles. */ + thread::Builder::new() + .name(format!("zip-output-writer-{}", consumer_index)) + .spawn_scoped( + scope, + wrap_spawn_err(err_sender.clone(), move || { + let uncompressed_receiver = uncompressed_receiver; + let mut uncompressed_read_end = uncompressed_read_end; + + #[cfg(target_os = "linux")] + let mut s = PipeReadSplicer; + #[cfg(not(target_os = "linux"))] + let mut splice_buf: Box<[u8]> = + vec![0u8; splice_read_buffer_length].into_boxed_slice(); + #[cfg(not(target_os = "linux"))] + let mut s = PipeReadBufferSplicer::new(&mut splice_buf); + + for (entry, mut output_file) in uncompressed_receiver.iter() { + s.splice_to_file_all( + &mut uncompressed_read_end, + (&mut output_file, 0), + entry.uncompressed_size.try_into().unwrap(), + )?; + let output_file = output_file.into_file(); + output_file.sync_data()?; + mem::drop(output_file); + } + + Ok(()) + }), + )?; + + /* (7) Read compressed entries, decompress them, then write them to the output + * thread. */ + thread::Builder::new() + .name(format!("zip-decompressor-{}", consumer_index)) + .spawn_scoped( + scope, + wrap_spawn_err(err_sender.clone(), move || { + use io::{Read, Write}; + + let compressed_receiver = compressed_receiver; + let uncompressed_sender = uncompressed_sender; + let mut compressed_read_end = compressed_read_end; + let mut uncompressed_write_end = uncompressed_write_end; + + /* Create a persistent heap-allocated buffer to copy decompressed + * data through. We will be reusing this allocation, so pay the cost + * of initialization exactly once. */ + let mut buffer_allocation: Box<[u8]> = + vec![0u8; decompression_copy_buffer_length].into_boxed_slice(); + + for (entry, output_file) in compressed_receiver.iter() { + /* Construct the decompressing reader. */ + let limited_reader = ((&mut compressed_read_end) + as &mut dyn Read) + .take(entry.compressed_size); + let crypto_reader = + make_crypto_reader(entry, limited_reader, None, None)?; + let mut decompressing_reader = make_reader( + entry.compression_method, + entry.crc32, + crypto_reader, + )?; + let mut limited_writer = TakeWrite::take( + uncompressed_write_end.by_ref(), + entry.uncompressed_size, + ); + /* Send the entry and output file to the writer thread before + * writing this entry's decompressed contents. */ + uncompressed_sender.send((entry, output_file)).unwrap(); + copy_via_buf( + &mut decompressing_reader, + &mut limited_writer, + &mut buffer_allocation, + )?; + } + + Ok(()) + }), + )?; + + /* (6) Wait on splicing the data from this entry, or using copy_file_range() to + * copy it if uncompressed. */ + thread::Builder::new() + .name(format!("zip-reader-{}", consumer_index)) + .spawn_scoped( + scope, + wrap_spawn_err(err_sender.clone(), move || { + let notify_readiness = notify_readiness; + let read_recv = read_recv; + let compressed_sender = compressed_sender; + let mut compressed_write_end = compressed_write_end; + + let mut copy_buf: Box<[u8]> = + vec![0u8; file_range_copy_buffer_length].into_boxed_slice(); + let mut buffer_c = FileBufferCopy::new(&mut copy_buf); + + #[cfg(target_os = "linux")] + let mut s = PipeWriteSplicer::new(); + #[cfg(not(target_os = "linux"))] + let mut splice_buf: Box<[u8]> = + vec![0u8; splice_write_buffer_length].into_boxed_slice(); + #[cfg(not(target_os = "linux"))] + let mut s = PipeWriteBufferSplicer::new(&mut splice_buf); + + /* Notify readiness *after* setting up copy buffers, but *before* + * waiting on any entries sent from the zip-input-reader thread, + * since zip-input-reader won't send us anything over `read_recv` + * until we notify them. */ + notify_readiness(); + + for (entry, data_start, mut output_file) in read_recv.iter() { + /* If uncompressed, we can use copy_file_range() directly, and + * avoid splicing through our decompression pipeline. */ + if entry.compression_method == CompressionMethod::Stored { + assert_eq!(entry.compressed_size, entry.uncompressed_size); + let copy_len: usize = + entry.uncompressed_size.try_into().unwrap(); + + #[cfg(target_os = "linux")] + if input_file.on_same_device(&output_file)? { + /* Linux can map pages from one file to another + * directly, without copying through userspace, but + * only if the files are located on the same device. */ + let mut file_c = FileCopy::new(); + file_c.copy_file_range_all( + (&input_file, data_start), + (&mut output_file, 0), + copy_len, + )?; + } else { + buffer_c.copy_file_range_all( + (&input_file, data_start), + (&mut output_file, 0), + copy_len, + )?; + } + #[cfg(not(target_os = "linux"))] + buffer_c.copy_file_range_all( + (&input_file, data_start), + (&mut output_file, 0), + copy_len, + )?; + + let output_file = output_file.into_file(); + /* fsync(2) says setting the file length is a form of + * metadata that requires fsync() over fdatasync(); it's + * unclear whether rust already performs that in the + * File::set_len() call performed in the OutputFile::new() + * constructor, but this shouldn't really matter for + * performance. */ + output_file.sync_all()?; + /* This is done automatically, but this way we can ensure + * we've correctly avoided aliasing the output file in this + * branch. */ + mem::drop(output_file); + + /* We're now completely done with this entry and have + * closed the output file handle, so we can receive another + * one. */ + notify_readiness(); + continue; + } + + /* If compressed, we want to perform decompression (rust-level + * synchronous computation) in a separate thread, to avoid + * jumping back and forth in the call stack between i/o from the + * kernel and in-memory computation in rust. */ + compressed_sender.send((entry, output_file)).unwrap(); + + /* Write this uncompressed entry into the waiting pipe. Because + * unix pipes have a constant non-configurable buffer size of + * PIPE_BUF (on Linux, this is 4096 bytes; see pipe(7)), we will + * end up blocking on this repeated splice() call until almost + * the entire entry is decompressed in the decompressor thread. + * This is a nice form of built-in flow control for other use + * cases, but for our purposes we might like to use larger + * buffers so that we can get further ahead in I/O from the + * zip-input-reader thread. However, if we avoid using pipes + * from the kernel, we won't be able to take advantage of + * splice()'s zero-copy optimization on Linux. */ + /* TODO: consider using rust-level ring buffers here with + * configurable size on all platforms, trading greater memory + * allocation for further I/O readahead throughput. */ + s.splice_from_file_all( + (&input_file, data_start), + &mut compressed_write_end, + entry.compressed_size.try_into().unwrap(), + )?; + + /* Notify the zip-input-reader thread that we are ready to + * read another entry's bytes from the input file handle. */ + notify_readiness(); + } + + Ok(()) + }), + )?; + + Ok(read_send) + }) + .collect::>()?; + + /* (5) Iterate over each entry sequentially, farming it out to a pipe to decompress if + * needed. */ + thread::Builder::new() + .name("zip-input-reader".to_string()) + .spawn_scoped( + scope, + wrap_spawn_err(err_sender, move || { + #[allow(clippy::mutable_key_type)] + let mut file_handle_mapping = file_handle_mapping; + /* All consumer pipelines share the same channel to notify us of their + * identity when ready. */ + let queue_receiver = queue_receiver; + /* The only output channel we have to consumer pipelines is a single sender + * to notify them of the current entry they should be reading, + * decompressing, then writing to the preallocated output file handle. */ + let read_sends: Vec> = + input_writer_infos; + + /* Entries are ordered by their offset, so we will be going monotonically + * forward in the underlying file. */ + for entry in shared.files.values() { + /* We have already created all necessary directories, and we set any + * dir perms after extracting file contents. */ + if entry.is_dir() || entry.is_dir_by_mode() { + continue; + } + + /* Create a handle to the memory location of this entry. This allows + * us to quickly test membership without hashing any + * arbitrary-length strings/etc, and avoids the need to impl Hash/Eq + * on ZipFileData more generally. */ + let handle = ZipDataHandle::wrap(entry); + /* Wrap the preallocated output handle for this entry in our + * linux-specific wrapper. */ + let output_file = file_handle_mapping.remove(&handle).unwrap(); + /* Set the length of the output handle according to the known output + * size. */ + let output_file = + FileOutput::new(output_file, entry.uncompressed_size)?; + + /* Get the start of data for this entry without mutating any state + * using pread. */ + let data_start = get_or_find_data_start(entry, input_file)?; + + /* Wait until a free consumer is available, then send the prepared + * entry range into the waiting consumer thread. */ + let ready_consumer_index = queue_receiver.recv().unwrap(); + read_sends[ready_consumer_index] + .send((entry, data_start, output_file)) + .unwrap(); + } + + assert!(file_handle_mapping.is_empty()); + + Ok(()) + }), + )?; + + /* If no I/O errors occurred, this won't trigger. We will only be able to propagate + * a single I/O error, but this also avoids propagating any errors triggered after the + * initial one. */ + if let Some(err) = err_receiver.iter().next() { + return Err(err); + } + + /* (10) Set permissions on specified entries. */ + /* TODO: consider parallelizing this with rayon's parallel iterators. */ + for (entry_path, perms) in perms_todo.into_iter() { + fs::set_permissions(entry_path, perms)?; + } + + Ok(()) + }) + } + + #[cfg(test)] + mod test { + use tempdir::TempDir; + use tempfile; + + use std::io::prelude::*; + + use crate::write::{SimpleFileOptions, ZipWriter}; + + use super::*; + + #[test] + fn subdir_creation() { + #[cfg(unix)] + use std::os::unix::fs::PermissionsExt; + + /* Create test archive. */ + let mut zip = ZipWriter::new(tempfile::tempfile().unwrap()); + let opts = SimpleFileOptions::default().compression_method(CompressionMethod::Stored); + + zip.start_file("a/b/c", opts.unix_permissions(0o765)) + .unwrap(); + zip.write_all(b"asdf").unwrap(); + + zip.add_directory("a/b", opts.unix_permissions(0o500)) + .unwrap(); + + zip.start_file( + "d/e", + opts.compression_method(CompressionMethod::Deflated) + .unix_permissions(0o755), + ) + .unwrap(); + zip.write_all(b"ffasedfasjkef").unwrap(); + + /* Create readable archive and extraction dir. */ + let zip = zip.finish_into_readable().unwrap(); + let td = TempDir::new("pipeline-test").unwrap(); + + /* Perform the whole end-to-end extraction process. */ + split_extract(&zip, td.path(), ExtractionParameters::default()).unwrap(); + + #[cfg(unix)] + assert_eq!( + 0o765, + fs::metadata(td.path().join("a/b/c")) + .unwrap() + .permissions() + .mode() + & 0o777 + ); + assert_eq!(b"asdf", &fs::read(td.path().join("a/b/c")).unwrap()[..]); + + #[cfg(unix)] + assert_eq!( + 0o500, + fs::metadata(td.path().join("a/b")) + .unwrap() + .permissions() + .mode() + & 0o777, + ); + + #[cfg(unix)] + assert_eq!( + 0o755, + fs::metadata(td.path().join("d/e")) + .unwrap() + .permissions() + .mode() + & 0o777 + ); + assert_eq!( + b"ffasedfasjkef", + &fs::read(td.path().join("d/e")).unwrap()[..] + ); + } + } +} diff --git a/src/read/split.rs b/src/read/split.rs new file mode 100644 index 000000000..2b392c179 --- /dev/null +++ b/src/read/split.rs @@ -0,0 +1,1186 @@ +//! Traits for splitting and teeing file contents into multiple parallel streams. + +#![allow(clippy::needless_lifetimes)] +#![cfg_attr(not(unix), allow(dead_code))] + +macro_rules! interruptible_buffered_io_op { + ($op:expr) => { + match $op { + Ok(n) => n, + Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + }; +} + +#[cfg_attr(not(unix), allow(unused_macros))] +macro_rules! syscall_errno { + ($syscall:expr) => { + match $syscall { + rc if rc < 0 => return Err(::std::io::Error::last_os_error()), + rc => rc, + } + }; +} + +pub mod file { + use std::io; + use std::mem::MaybeUninit; + use std::ops; + + pub trait FixedFile { + fn extent(&self) -> u64; + + #[inline(always)] + fn convert_range(&self, range: impl ops::RangeBounds) -> io::Result> { + let len = self.extent(); + let start = match range.start_bound() { + ops::Bound::Included(&start) => start, + ops::Bound::Excluded(start) => start.checked_add(1).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "start too large") + })?, + ops::Bound::Unbounded => 0, + }; + let end = { + let unclamped_end = match range.end_bound() { + ops::Bound::Included(end) => end.checked_add(1).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "end too large") + })?, + ops::Bound::Excluded(&end) => end, + ops::Bound::Unbounded => len, + }; + #[allow(clippy::let_and_return)] + let clamped_end = unclamped_end.min(len); + clamped_end + }; + + if start > end { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "start past end", + )); + } + Ok(ops::Range { start, end }) + } + + #[inline(always)] + fn range_len(&self, start: u64, len: usize) -> io::Result { + let len: u64 = len.try_into().unwrap(); + let ops::Range { start, end } = self.convert_range(start..(start + len))?; + let len: u64 = end - start; + Ok(len.try_into().unwrap()) + } + } + + pub trait InputFile: FixedFile { + fn pread(&self, start: u64, buf: &mut [MaybeUninit]) -> io::Result; + + fn pread_all(&self, start: u64, buf: &mut [MaybeUninit]) -> io::Result<()> { + let len: usize = buf.len(); + let mut input_offset: u64 = start; + let mut remaining_to_read: usize = len; + + while remaining_to_read > 0 { + let num_read: usize = interruptible_buffered_io_op![ + self.pread(input_offset, &mut buf[(len - remaining_to_read)..]) + ]; + if num_read == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "pread less than expected range", + )); + } + assert!(num_read <= remaining_to_read); + remaining_to_read -= num_read; + let num_read_offset: u64 = num_read.try_into().unwrap(); + input_offset += num_read_offset; + } + + Ok(()) + } + } + + pub trait OutputFile: FixedFile { + fn pwrite(&mut self, start: u64, buf: &[u8]) -> io::Result; + + fn pwrite_all(&mut self, start: u64, buf: &[u8]) -> io::Result<()> { + let len: usize = buf.len(); + let mut output_offset: u64 = start; + let mut remaining_to_write: usize = len; + + while remaining_to_write > 0 { + let num_written: usize = interruptible_buffered_io_op![ + self.pwrite(output_offset, &buf[(len - remaining_to_write)..]) + ]; + if num_written == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "pwrite less than expected range", + )); + } + assert!(num_written <= remaining_to_write); + remaining_to_write -= num_written; + let num_written_offset: u64 = num_written.try_into().unwrap(); + output_offset += num_written_offset; + } + + Ok(()) + } + } + + pub trait CopyRange { + type InF: InputFile; + type OutF: OutputFile; + + fn copy_file_range( + &mut self, + from: (&Self::InF, u64), + to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result; + + fn copy_file_range_all( + &mut self, + from: (&Self::InF, u64), + mut to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result<()> { + #[allow(clippy::needless_borrow)] + let (ref from, from_offset) = from; + let (ref mut to, to_offset) = to; + + let mut remaining_to_copy: usize = len; + let mut input_offset: u64 = from_offset; + let mut output_offset: u64 = to_offset; + + while remaining_to_copy > 0 { + let num_copied: usize = interruptible_buffered_io_op![self.copy_file_range( + (from, input_offset), + (to, output_offset), + remaining_to_copy, + )]; + if num_copied == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "copied less than expected file range", + )); + } + assert!(num_copied <= remaining_to_copy); + remaining_to_copy -= num_copied; + let num_copied_offset: u64 = num_copied.try_into().unwrap(); + input_offset += num_copied_offset; + output_offset += num_copied_offset; + } + + Ok(()) + } + } + + #[cfg(unix)] + pub mod unix { + use super::{CopyRange, FixedFile, InputFile, OutputFile}; + + use std::fs; + use std::io; + use std::marker::PhantomData; + use std::mem::MaybeUninit; + use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; + use std::slice; + + use libc; + + #[derive(Debug, Copy, Clone)] + pub struct FileInput<'fd> { + handle: BorrowedFd<'fd>, + extent: u64, + } + + pub(crate) fn fstat(fd: RawFd) -> io::Result { + let fd: libc::c_int = fd; + let mut stat: MaybeUninit = MaybeUninit::uninit(); + + syscall_errno![unsafe { libc::fstat(fd, stat.as_mut_ptr()) }]; + Ok(unsafe { stat.assume_init() }) + } + + pub(crate) fn get_len(fd: RawFd) -> io::Result { + let libc::stat { st_size, .. } = fstat(fd)?; + let size: u64 = st_size.try_into().unwrap(); + Ok(size) + } + + impl<'fd> FileInput<'fd> { + pub fn new(file: &'fd impl AsFd) -> io::Result { + let handle = file.as_fd(); + let extent = get_len(handle.as_raw_fd())?; + Ok(Self { handle, extent }) + } + + pub(crate) fn fd(&self) -> RawFd { + self.handle.as_raw_fd() + } + + #[allow(dead_code)] + pub fn on_same_device(&self, to: &FileOutput) -> io::Result { + let libc::stat { + st_dev: from_dev, .. + } = fstat(self.fd())?; + let libc::stat { st_dev: to_dev, .. } = fstat(to.fd())?; + Ok(from_dev == to_dev) + } + } + + impl<'fd> FixedFile for FileInput<'fd> { + fn extent(&self) -> u64 { + self.extent + } + } + + impl<'fd> InputFile for FileInput<'fd> { + fn pread(&self, start: u64, buf: &mut [MaybeUninit]) -> io::Result { + let count = self.range_len(start, buf.len())?; + + let fd: libc::c_int = self.fd(); + let p: *mut libc::c_void = buf.as_mut_ptr().cast(); + let offset: libc::off_t = start.try_into().unwrap(); + + let n: usize = syscall_errno![unsafe { libc::pread(fd, p, count, offset) }] + .try_into() + .unwrap(); + Ok(n) + } + } + + pub struct FileOutput { + handle: OwnedFd, + extent: u64, + } + + impl FileOutput { + pub fn new(file: fs::File, extent: u64) -> io::Result { + file.set_len(extent)?; + Ok(Self { + handle: file.into(), + extent, + }) + } + + pub(crate) fn fd(&self) -> RawFd { + self.handle.as_raw_fd() + } + + pub fn into_file(self) -> fs::File { + self.handle.into() + } + } + + impl FixedFile for FileOutput { + fn extent(&self) -> u64 { + self.extent + } + } + + impl OutputFile for FileOutput { + fn pwrite(&mut self, start: u64, buf: &[u8]) -> io::Result { + let count = self.range_len(start, buf.len())?; + + let fd: libc::c_int = self.fd(); + let p: *const libc::c_void = buf.as_ptr().cast(); + let offset: libc::off_t = start.try_into().unwrap(); + + let n: usize = syscall_errno![unsafe { libc::pwrite(fd, p, count, offset) }] + .try_into() + .unwrap(); + Ok(n) + } + } + + pub struct FileBufferCopy<'infd, 'buf> { + buf: &'buf mut [u8], + _ph: PhantomData<&'infd u8>, + } + + impl<'infd, 'buf> FileBufferCopy<'infd, 'buf> { + pub fn new(buf: &'buf mut [u8]) -> Self { + assert!(!buf.is_empty()); + Self { + buf, + _ph: PhantomData, + } + } + } + + impl<'infd, 'buf> CopyRange for FileBufferCopy<'infd, 'buf> { + type InF = FileInput<'infd>; + type OutF = FileOutput; + + fn copy_file_range( + &mut self, + from: (&Self::InF, u64), + mut to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result { + #[allow(clippy::needless_borrow)] + let (ref from, from_start) = from; + let (ref mut to, to_start) = to; + + let buf_clamped_len = len.min(self.buf.len()); + let from_len = from.range_len(from_start, buf_clamped_len)?; + let to_len = to.range_len(to_start, buf_clamped_len)?; + let clamped_len = from_len.min(to_len); + if clamped_len == 0 { + return Ok(0); + } + + let clamped_buf: &'buf mut [MaybeUninit] = { + let p: *mut MaybeUninit = self.buf.as_mut_ptr().cast(); + unsafe { slice::from_raw_parts_mut(p, clamped_len) } + }; + + let num_read: usize = from.pread(from_start, clamped_buf)?; + assert!(num_read > 0); + assert!(num_read <= clamped_buf.len()); + + let result_buf: &'buf [u8] = { + let p: *const u8 = clamped_buf.as_mut_ptr().cast_const().cast(); + unsafe { slice::from_raw_parts(p, num_read) } + }; + + /* TODO: use a ring buffer instead of .pwrite_all() here! */ + to.pwrite_all(to_start, result_buf)?; + + Ok(result_buf.len()) + } + } + + #[cfg(test)] + mod test { + use super::*; + + use std::fs; + use std::io::{self, prelude::*}; + use std::mem; + + use tempfile; + + fn readable_file(input: &[u8]) -> io::Result { + let mut i = tempfile::tempfile()?; + i.write_all(input)?; + Ok(i) + } + + #[allow(clippy::missing_transmute_annotations)] + #[test] + fn pread() { + let i = readable_file(b"asdf").unwrap(); + let ii = FileInput::new(&i).unwrap(); + + let buf: MaybeUninit<[u8; 10]> = MaybeUninit::zeroed(); + let mut buf: [MaybeUninit; 10] = unsafe { mem::transmute(buf) }; + assert_eq!(2, ii.pread(0, &mut buf[..2]).unwrap()); + assert_eq!( + unsafe { mem::transmute::<_, &[u8]>(&buf[..2]) }, + b"as".as_ref() + ); + assert_eq!(3, ii.pread(1, &mut buf[4..]).unwrap()); + assert_eq!( + unsafe { mem::transmute::<_, &[u8]>(&buf[..]) }, + &[b'a', b's', 0, 0, b's', b'd', b'f', 0, 0, 0] + ); + } + + #[test] + fn pwrite() { + let o = tempfile::tempfile().unwrap(); + let mut oo = FileOutput::new(o, 10).unwrap(); + + let i = b"asdf"; + assert_eq!(2, oo.pwrite(0, &i[..2]).unwrap()); + assert_eq!(3, oo.pwrite(4, &i[1..]).unwrap()); + assert_eq!(1, oo.pwrite(9, &i[..]).unwrap()); + + let mut o = oo.into_file(); + o.rewind().unwrap(); + let mut buf = Vec::new(); + o.read_to_end(&mut buf).unwrap(); + assert_eq!(&buf[..], &[b'a', b's', 0, 0, b's', b'd', b'f', 0, 0, b'a']); + } + + #[test] + fn copy_file_range() { + let i = readable_file(b"asdf").unwrap(); + let ii = FileInput::new(&i).unwrap(); + + let o = tempfile::tempfile().unwrap(); + let mut oo = FileOutput::new(o, 10).unwrap(); + + /* Buffer is size 2, which limits the max size of individual copy_file_range() + * calls. */ + let mut buf = vec![0u8; 2].into_boxed_slice(); + + let mut c = FileBufferCopy::new(&mut buf); + assert_eq!(2, c.copy_file_range((&ii, 0), (&mut oo, 0), 2).unwrap()); + assert_eq!(2, c.copy_file_range((&ii, 1), (&mut oo, 4), 20).unwrap()); + assert_eq!(1, c.copy_file_range((&ii, 0), (&mut oo, 9), 35).unwrap()); + + let mut o = oo.into_file(); + o.rewind().unwrap(); + let mut buf = Vec::new(); + o.read_to_end(&mut buf).unwrap(); + + assert_eq!(&buf[..], &[b'a', b's', 0, 0, b's', b'd', 0, 0, 0, b'a']); + } + } + } + + #[cfg(target_os = "linux")] + pub mod linux { + use super::unix::{FileInput, FileOutput}; + use super::{CopyRange, FixedFile}; + + use std::io; + use std::marker::PhantomData; + + use libc; + + pub struct FileCopy<'infd>(PhantomData<&'infd u8>); + + impl<'infd> FileCopy<'infd> { + pub const fn new() -> Self { + Self(PhantomData) + } + } + + impl<'infd> CopyRange for FileCopy<'infd> { + type InF = FileInput<'infd>; + type OutF = FileOutput; + + fn copy_file_range( + &mut self, + from: (&Self::InF, u64), + to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result { + let (from, from_start) = from; + let (to, to_start) = to; + + let from_len = from.range_len(from_start, len)?; + let to_len = to.range_len(to_start, len)?; + let clamped_len = from_len.min(to_len); + + let from_fd: libc::c_int = from.fd(); + let mut from_offset: libc::off64_t = from_start.try_into().unwrap(); + let to_fd: libc::c_int = to.fd(); + let mut to_offset: libc::off64_t = to_start.try_into().unwrap(); + + let flags: libc::c_uint = 0; + + let n: usize = syscall_errno![unsafe { + libc::copy_file_range( + from_fd, + &mut from_offset, + to_fd, + &mut to_offset, + clamped_len, + flags, + ) + }] + .try_into() + .unwrap(); + Ok(n) + } + } + + #[cfg(test)] + mod test { + use super::*; + + use std::fs; + use std::io::{self, prelude::*}; + + use tempfile; + + fn readable_file(input: &[u8]) -> io::Result { + let mut i = tempfile::tempfile()?; + i.write_all(input)?; + Ok(i) + } + + #[test] + fn copy_file_range() { + let i = readable_file(b"asdf").unwrap(); + let ii = FileInput::new(&i).unwrap(); + + let o = tempfile::tempfile().unwrap(); + let mut oo = FileOutput::new(o, 10).unwrap(); + + let mut c = FileCopy::new(); + assert_eq!(2, c.copy_file_range((&ii, 0), (&mut oo, 0), 2).unwrap()); + assert_eq!(3, c.copy_file_range((&ii, 1), (&mut oo, 4), 20).unwrap()); + assert_eq!(1, c.copy_file_range((&ii, 0), (&mut oo, 9), 35).unwrap()); + + let mut o = oo.into_file(); + o.rewind().unwrap(); + let mut buf = Vec::new(); + o.read_to_end(&mut buf).unwrap(); + + assert_eq!(&buf[..], &[b'a', b's', 0, 0, b's', b'd', b'f', 0, 0, b'a']); + } + } + } +} + +pub mod pipe { + use super::file::{InputFile, OutputFile}; + + use std::io; + + #[allow(dead_code)] + pub trait WriteEnd: io::Write {} + + pub trait WriteSplicer { + type InF: InputFile; + type OutP: WriteEnd; + + fn splice_from_file( + &mut self, + from: (&Self::InF, u64), + to: &mut Self::OutP, + len: usize, + ) -> io::Result; + + fn splice_from_file_all( + &mut self, + from: (&Self::InF, u64), + to: &mut Self::OutP, + len: usize, + ) -> io::Result<()> { + #[allow(clippy::needless_borrow)] + let (ref from, from_offset) = from; + + let mut remaining_to_read: usize = len; + let mut input_offset: u64 = from_offset; + while remaining_to_read > 0 { + let num_read: usize = interruptible_buffered_io_op![self.splice_from_file( + (from, input_offset), + to, + remaining_to_read + )]; + if num_read == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "spliced less than expected range from file", + )); + } + assert!(num_read <= remaining_to_read); + remaining_to_read -= num_read; + let num_read_offset: u64 = num_read.try_into().unwrap(); + input_offset += num_read_offset; + } + + Ok(()) + } + } + + #[allow(dead_code)] + pub trait ReadEnd: io::Read {} + + pub trait ReadSplicer { + type InP: ReadEnd; + type OutF: OutputFile; + + fn splice_to_file( + &mut self, + from: &mut Self::InP, + to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result; + + fn splice_to_file_all( + &mut self, + from: &mut Self::InP, + mut to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result<()> { + let (ref mut to, to_offset) = to; + + let mut remaining_to_write: usize = len; + let mut output_offset: u64 = to_offset; + while remaining_to_write > 0 { + let num_written: usize = interruptible_buffered_io_op![self.splice_to_file( + from, + (to, output_offset), + remaining_to_write + )]; + if num_written == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "spliced less than expected range to file", + )); + } + assert!(num_written <= remaining_to_write); + remaining_to_write -= num_written; + let num_written_offset: u64 = num_written.try_into().unwrap(); + output_offset += num_written_offset; + } + + Ok(()) + } + } + + #[cfg(unix)] + pub mod unix { + use super::{ReadEnd, ReadSplicer, WriteEnd, WriteSplicer}; + + use crate::read::split::file::unix::{FileInput, FileOutput}; + use crate::read::split::file::{FixedFile, InputFile, OutputFile}; + + use std::io::{self, Read, Write}; + use std::marker::PhantomData; + use std::mem::MaybeUninit; + use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; + use std::slice; + + use libc; + + pub struct WritePipe { + handle: OwnedFd, + } + + impl WritePipe { + pub(crate) unsafe fn from_fd(fd: RawFd) -> Self { + Self { + handle: OwnedFd::from_raw_fd(fd), + } + } + + pub(crate) fn fd(&self) -> RawFd { + self.handle.as_raw_fd() + } + } + + impl io::Write for WritePipe { + fn write(&mut self, buf: &[u8]) -> io::Result { + let fd: libc::c_int = self.fd(); + + /* TODO: use vmsplice() instead on linux! However, UB results if the buffer is + * modified before the data is read by the output: see + * https://stackoverflow.com/questions/70515745/how-do-i-use-vmsplice-to-correctly-output-to-a-pipe. + * This may be possible to handle with some sort of ring buffer, but for now let's + * take the hit and avoid race conditions by using write() on all unix-likes. */ + let n: usize = + syscall_errno![unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) }] + .try_into() + .unwrap(); + Ok(n) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl WriteEnd for WritePipe {} + + pub struct ReadPipe { + handle: OwnedFd, + } + + impl ReadPipe { + pub(crate) unsafe fn from_fd(fd: RawFd) -> Self { + Self { + handle: OwnedFd::from_raw_fd(fd), + } + } + + pub(crate) fn fd(&self) -> RawFd { + self.handle.as_raw_fd() + } + } + + impl io::Read for ReadPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let fd: libc::c_int = self.fd(); + + /* TODO: vmsplice() on linux currently offers no additional optimization for reads, + * so just use read() on all platforms. Also note as in WritePipe::write() that + * some sort of ring buffer is probably necessary to avoid race conditions if this + * optimization is performed. */ + let n: usize = + syscall_errno![unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) }] + .try_into() + .unwrap(); + Ok(n) + } + } + + impl ReadEnd for ReadPipe {} + + pub fn create_pipe() -> io::Result<(ReadPipe, WritePipe)> { + let mut fds: [libc::c_int; 2] = [0; 2]; + syscall_errno![unsafe { libc::pipe(fds.as_mut_ptr()) }]; + let [r, w] = fds; + let (r, w) = unsafe { (ReadPipe::from_fd(r), WritePipe::from_fd(w)) }; + Ok((r, w)) + } + + pub struct PipeWriteBufferSplicer<'infd, 'buf> { + buf: &'buf mut [u8], + _ph: PhantomData<&'infd u8>, + } + + impl<'infd, 'buf> PipeWriteBufferSplicer<'infd, 'buf> { + #[allow(dead_code)] + pub fn new(buf: &'buf mut [u8]) -> Self { + assert!(!buf.is_empty()); + Self { + buf, + _ph: PhantomData, + } + } + } + + impl<'infd, 'buf> WriteSplicer for PipeWriteBufferSplicer<'infd, 'buf> { + type InF = FileInput<'infd>; + type OutP = WritePipe; + + fn splice_from_file( + &mut self, + from: (&Self::InF, u64), + to: &mut Self::OutP, + len: usize, + ) -> io::Result { + #[allow(clippy::needless_borrow)] + let (ref from, from_start) = from; + + let buf_clamped_len = len.min(self.buf.len()); + let from_len = from.range_len(from_start, buf_clamped_len)?; + let clamped_len = from_len; + if clamped_len == 0 { + return Ok(0); + } + + let clamped_buf: &'buf mut [MaybeUninit] = { + let p: *mut MaybeUninit = self.buf.as_mut_ptr().cast(); + unsafe { slice::from_raw_parts_mut(p, clamped_len) } + }; + + let num_read: usize = from.pread(from_start, clamped_buf)?; + assert!(num_read > 0); + assert!(num_read <= clamped_buf.len()); + + let result_buf: &'buf [u8] = { + let p: *const u8 = clamped_buf.as_mut_ptr().cast_const().cast(); + unsafe { slice::from_raw_parts(p, num_read) } + }; + + /* TODO: use a ring buffer instead of .write_all() here! */ + to.write_all(result_buf)?; + + Ok(result_buf.len()) + } + } + + pub struct PipeReadBufferSplicer<'buf> { + buf: &'buf mut [u8], + } + + impl<'buf> PipeReadBufferSplicer<'buf> { + #[allow(dead_code)] + pub fn new(buf: &'buf mut [u8]) -> Self { + assert!(!buf.is_empty()); + Self { buf } + } + } + + impl<'buf> ReadSplicer for PipeReadBufferSplicer<'buf> { + type InP = ReadPipe; + type OutF = FileOutput; + + fn splice_to_file( + &mut self, + from: &mut Self::InP, + mut to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result { + let (ref mut to, to_start) = to; + + let buf_clamped_len = len.min(self.buf.len()); + let to_len = to.range_len(to_start, buf_clamped_len)?; + let clamped_len = to_len; + if clamped_len == 0 { + return Ok(0); + } + + let clamped_buf: &'buf mut [u8] = + unsafe { slice::from_raw_parts_mut(self.buf.as_mut_ptr(), clamped_len) }; + + let num_read: usize = from.read(clamped_buf)?; + if num_read == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "expected to read nonzero from blocking pipe", + )); + } + assert!(num_read <= clamped_buf.len()); + + let result_buf: &'buf [u8] = unsafe { + slice::from_raw_parts(clamped_buf.as_mut_ptr().cast_const(), num_read) + }; + + /* TODO: use a ring buffer instead of .pwrite_all() here! */ + to.pwrite_all(to_start, result_buf)?; + + Ok(result_buf.len()) + } + } + + #[cfg(test)] + mod test { + use super::*; + + use std::fs; + use std::io::prelude::*; + use std::thread; + + use tempfile; + + fn readable_file(input: &[u8]) -> io::Result { + let mut i = tempfile::tempfile()?; + i.write_all(input)?; + Ok(i) + } + + #[test] + fn read_write_pipe() { + let (mut r, mut w) = create_pipe().unwrap(); + + let t = thread::spawn(move || w.write_all(b"asdf")); + /* The write end is dropped after the string is written, which stops .read_to_end() + * from blocking. */ + let mut buf: Vec = Vec::new(); + r.read_to_end(&mut buf).unwrap(); + assert_eq!(b"asdf".as_ref(), &buf[..]); + t.join().unwrap().unwrap(); + } + + #[test] + fn splice_from_file() { + let (mut r, mut w) = create_pipe().unwrap(); + + let t = thread::spawn(move || { + let i = readable_file(b"asdf").unwrap(); + let ii = FileInput::new(&i).unwrap(); + /* Buffer is size 2, which limits the max size of individual splice() calls. */ + let mut buf = vec![0u8; 2].into_boxed_slice(); + let mut s = PipeWriteBufferSplicer::new(&mut buf); + s.splice_from_file((&ii, 1), &mut w, 13) + }); + + let mut buf: Vec = Vec::new(); + r.read_to_end(&mut buf).unwrap(); + /* Started from offset 1, and buf limited to 2, so only get 2 chars. */ + assert_eq!(b"sd".as_ref(), &buf[..]); + assert_eq!(2, t.join().unwrap().unwrap()); + } + + #[test] + fn splice_to_file() { + let o = tempfile::tempfile().unwrap(); + let mut oo = FileOutput::new(o, 5).unwrap(); + + let (mut r, mut w) = create_pipe().unwrap(); + let t = thread::spawn(move || w.write_all(b"asdfasdf")); + + /* Buffer is size 2, which limits the max size of individual splice() calls. */ + let mut buf = vec![0u8; 2].into_boxed_slice(); + let mut s = PipeReadBufferSplicer::new(&mut buf); + assert_eq!(2, s.splice_to_file(&mut r, (&mut oo, 2), 13).unwrap()); + + let mut o = oo.into_file(); + o.rewind().unwrap(); + let mut buf: Vec = Vec::new(); + o.read_to_end(&mut buf).unwrap(); + + /* Started from offset 2, and buf limited to 2, so only get 2 chars. */ + assert_eq!(&buf[..], &[0, 0, b'a', b's', 0]); + + /* Get remaining chars written. */ + buf.clear(); + r.read_to_end(&mut buf).unwrap(); + assert_eq!(&buf[..], b"dfasdf".as_ref()); + + t.join().unwrap().unwrap(); + } + } + } + + #[cfg(target_os = "linux")] + pub mod linux { + use super::unix::{ReadPipe, WritePipe}; + use super::{ReadSplicer, WriteSplicer}; + + use crate::read::split::file::unix::{FileInput, FileOutput}; + use crate::read::split::file::FixedFile; + + use std::io; + use std::marker::PhantomData; + use std::ptr; + + use libc; + + pub struct PipeWriteSplicer<'infd>(PhantomData<&'infd u8>); + + impl<'infd> PipeWriteSplicer<'infd> { + pub const fn new() -> Self { + Self(PhantomData) + } + } + + impl<'infd> WriteSplicer for PipeWriteSplicer<'infd> { + type InF = FileInput<'infd>; + type OutP = WritePipe; + + fn splice_from_file( + &mut self, + from: (&Self::InF, u64), + to: &mut Self::OutP, + len: usize, + ) -> io::Result { + let (from, from_start) = from; + + let count = from.range_len(from_start, len)?; + + let from_fd: libc::c_int = from.fd(); + let mut from_offset: libc::loff_t = from_start.try_into().unwrap(); + let to_fd: libc::c_int = to.fd(); + + let flags: libc::c_uint = 0; + let n: usize = syscall_errno![unsafe { + libc::splice( + from_fd, + &mut from_offset, + to_fd, + ptr::null_mut(), + count, + flags, + ) + }] + .try_into() + .unwrap(); + Ok(n) + } + } + + pub struct PipeReadSplicer; + + impl ReadSplicer for PipeReadSplicer { + type InP = ReadPipe; + type OutF = FileOutput; + + fn splice_to_file( + &mut self, + from: &mut Self::InP, + to: (&mut Self::OutF, u64), + len: usize, + ) -> io::Result { + let (to, to_start) = to; + + let count = to.range_len(to_start, len)?; + + let from_fd: libc::c_int = from.fd(); + let to_fd: libc::c_int = to.fd(); + let mut to_offset: libc::loff_t = to_start.try_into().unwrap(); + + let flags: libc::c_uint = 0; + let n: usize = syscall_errno![unsafe { + libc::splice( + from_fd, + ptr::null_mut(), + to_fd, + &mut to_offset, + count, + flags, + ) + }] + .try_into() + .unwrap(); + Ok(n) + } + } + + #[cfg(test)] + mod test { + use super::super::unix::create_pipe; + use super::*; + + use std::fs; + use std::io::prelude::*; + use std::thread; + + use tempfile; + + fn readable_file(input: &[u8]) -> io::Result { + let mut i = tempfile::tempfile()?; + i.write_all(input)?; + Ok(i) + } + + #[test] + fn splice_from_file() { + let (mut r, mut w) = create_pipe().unwrap(); + let t = thread::spawn(move || { + let i = readable_file(b"asdf").unwrap(); + let ii = FileInput::new(&i).unwrap(); + let mut s = PipeWriteSplicer::new(); + s.splice_from_file((&ii, 1), &mut w, 13) + }); + + let mut buf: Vec = Vec::new(); + r.read_to_end(&mut buf).unwrap(); + /* Started from offset 1, so only get 3 chars. */ + assert_eq!(b"sdf".as_ref(), &buf[..]); + assert_eq!(3, t.join().unwrap().unwrap()); + } + + #[test] + fn splice_to_file() { + let o = tempfile::tempfile().unwrap(); + let mut oo = FileOutput::new(o, 5).unwrap(); + + let (mut r, mut w) = create_pipe().unwrap(); + let t = thread::spawn(move || w.write_all(b"asdfasdf")); + + let mut s = PipeReadSplicer; + assert_eq!(3, s.splice_to_file(&mut r, (&mut oo, 2), 13).unwrap()); + + let mut o = oo.into_file(); + o.rewind().unwrap(); + let mut buf: Vec = Vec::new(); + o.read_to_end(&mut buf).unwrap(); + + /* Started from offset 2, so only get 3 chars. */ + assert_eq!(&buf[..], &[0, 0, b'a', b's', b'd']); + + /* Get remaining chars written. */ + buf.clear(); + r.read_to_end(&mut buf).unwrap(); + assert_eq!(&buf[..], b"fasdf".as_ref()); + + t.join().unwrap().unwrap(); + } + } + } +} + +pub mod util { + use std::io::{self, Read, Write}; + + pub struct TakeWrite { + inner: W, + limit: u64, + } + + impl TakeWrite { + pub const fn take(inner: W, limit: u64) -> Self { + Self { inner, limit } + } + + #[allow(dead_code)] + #[inline(always)] + pub const fn limit(&self) -> u64 { + self.limit + } + + #[allow(dead_code)] + pub fn into_inner(self) -> W { + self.inner + } + } + + impl Write for TakeWrite + where + W: Write, + { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.limit == 0 { + return Ok(0); + } + + let buf_len: u64 = buf.len().try_into().unwrap(); + let to_write_offset: u64 = buf_len.min(self.limit); + let to_write: usize = to_write_offset.try_into().unwrap(); + + let num_written: usize = self.inner.write(&buf[..to_write])?; + assert!(num_written <= to_write); + let num_written_offset: u64 = num_written.try_into().unwrap(); + self.limit -= num_written_offset; + Ok(num_written) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + } + + pub fn copy_via_buf(r: &mut R, w: &mut W, buf: &mut [u8]) -> io::Result + where + R: Read + ?Sized, + W: Write + ?Sized, + { + assert!(!buf.is_empty()); + let mut total_copied: u64 = 0; + + loop { + let num_read: usize = interruptible_buffered_io_op![r.read(buf)]; + if num_read == 0 { + break; + } + let num_read_offset: u64 = num_read.try_into().unwrap(); + + /* TODO: use a ring buffer instead of .write_all() here! */ + w.write_all(&buf[..num_read])?; + total_copied += num_read_offset; + } + + Ok(total_copied) + } + + #[cfg(test)] + mod test { + use super::*; + + use tempfile; + + use std::fs; + use std::io::{self, Cursor, Seek}; + + fn readable_file(input: &[u8]) -> io::Result { + let mut i = tempfile::tempfile()?; + i.write_all(input)?; + i.rewind()?; + Ok(i) + } + + #[test] + fn take_write_copy() { + let mut i = readable_file(b"asdf".as_ref()).unwrap(); + let out = Cursor::new(Vec::new()); + let mut limited = TakeWrite::take(out, 3); + assert_eq!(3, limited.limit()); + + let mut buf = [0u8; 15]; + + assert_eq!( + io::ErrorKind::WriteZero, + copy_via_buf(&mut i, &mut limited, &mut buf[..]) + .err() + .unwrap() + .kind() + ); + assert_eq!(0, limited.limit()); + let out = limited.into_inner().into_inner(); + assert_eq!(&out[..], b"asd".as_ref()); + } + } +} diff --git a/src/spec.rs b/src/spec.rs index 3d5318872..85d3da528 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -174,6 +174,18 @@ pub(crate) trait FixedSizeBlock: Pod { #[allow(clippy::wrong_self_convention)] fn from_le(self) -> Self; + #[allow(dead_code)] + fn interpret(input_block: &[u8]) -> ZipResult { + let mut block = Self::zeroed(); + block.as_bytes_mut().copy_from_slice(input_block); + let block = Self::from_le(block); + + if block.magic() != Self::MAGIC { + return Err(Self::WRONG_MAGIC_ERROR); + } + Ok(block) + } + fn parse(reader: &mut R) -> ZipResult { let mut block = Self::zeroed(); reader.read_exact(block.as_bytes_mut())?; diff --git a/src/types.rs b/src/types.rs index 39933b54b..231c7bd73 100644 --- a/src/types.rs +++ b/src/types.rs @@ -566,6 +566,20 @@ impl ZipFileData { } } + #[allow(dead_code)] + pub(crate) fn is_symlink(&self) -> bool { + self.unix_mode() + /* TODO: could this just be != 0? */ + .is_some_and(|mode| mode & ffi::S_IFLNK == ffi::S_IFLNK) + } + + #[allow(dead_code)] + pub(crate) fn is_dir_by_mode(&self) -> bool { + self.unix_mode() + /* TODO: could this just be != 0? */ + .is_some_and(|mode| mode & ffi::S_IFDIR == ffi::S_IFDIR) + } + /// PKZIP version needed to open this file (from APPNOTE 4.4.3.2). pub fn version_needed(&self) -> u16 { let compression_version: u16 = match self.compression_method { diff --git a/src/write.rs b/src/write.rs index 96b72a2f3..1d516f5a0 100644 --- a/src/write.rs +++ b/src/write.rs @@ -536,10 +536,10 @@ impl FileOptions<'_, ExtendedFileOptions> { /// Removes the extra data fields. #[must_use] pub fn clear_extra_data(mut self) -> Self { - if self.extended_options.extra_data.len() > 0 { + if !self.extended_options.extra_data.is_empty() { self.extended_options.extra_data = Arc::new(vec![]); } - if self.extended_options.central_extra_data.len() > 0 { + if !self.extended_options.central_extra_data.is_empty() { self.extended_options.central_extra_data = Arc::new(vec![]); } self diff --git a/tests/data/stored-and-compressed-text.zip b/tests/data/stored-and-compressed-text.zip new file mode 100644 index 000000000..b17501747 Binary files /dev/null and b/tests/data/stored-and-compressed-text.zip differ diff --git a/tests/repro_old423.rs b/tests/repro_old423.rs index f87245e5b..4e8963a99 100644 --- a/tests/repro_old423.rs +++ b/tests/repro_old423.rs @@ -10,3 +10,18 @@ fn repro_old423() -> zip::result::ZipResult<()> { let mut archive = ZipArchive::new(io::Cursor::new(v)).expect("couldn't open test zip file"); archive.extract(TempDir::with_prefix("repro_old423")?) } + +#[cfg(all(unix, feature = "parallelism", feature = "_deflate-any"))] +#[test] +fn repro_old423_pipelined() -> zip::result::ZipResult<()> { + use std::{fs, path::Path}; + use tempdir::TempDir; + use zip::{read::split_extract, ZipArchive}; + + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/data/lin-ub_iwd-v11.zip"); + let file = fs::File::open(path)?; + let archive = ZipArchive::new(file)?; + let td = TempDir::new("repro_old423")?; + split_extract(&archive, td.path(), Default::default()).expect("couldn't extract test zip"); + Ok(()) +}