diff --git a/Cargo.lock b/Cargo.lock index 0d403b3e288e..919e9a240121 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1183,9 +1183,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" dependencies = [ "bytes", "fnv", @@ -2532,12 +2532,14 @@ dependencies = [ name = "puffin-extract" version = "0.0.1" dependencies = [ + "async_zip", "flate2", "fs-err", "rayon", "tar", "thiserror", "tokio", + "tokio-util", "zip", ] diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index 08aecdbd8017..3baa5a9cc65c 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use bytesize::ByteSize; use fs_err::tokio as fs; +use puffin_extract::unzip_no_seek; use thiserror::Error; use tokio::task::JoinError; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -21,7 +22,7 @@ use puffin_git::GitSource; use puffin_traits::BuildContext; use pypi_types::Metadata21; -use crate::download::BuiltWheel; +use crate::download::{BuiltWheel, UnzippedWheel}; use crate::locks::Locks; use crate::reporter::Facade; use crate::{ @@ -37,6 +38,8 @@ pub enum DistributionDatabaseError { #[error(transparent)] Client(#[from] puffin_client::Error), #[error(transparent)] + Extract(#[from] puffin_extract::Error), + #[error(transparent)] Io(#[from] io::Error), #[error(transparent)] Distribution(#[from] distribution_types::Error), @@ -108,30 +111,62 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> ) -> Result { match &dist { Dist::Built(BuiltDist::Registry(wheel)) => { - // Fetch the wheel. let url = wheel .base .join_relative(&wheel.file.url) .map_err(|err| DistributionDatabaseError::Url(wheel.file.url.clone(), err))?; + // Make cache entry let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?; + let cache_entry = self.cache.entry( + CacheBucket::Wheels, + WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()), + wheel_filename.stem(), + ); + // Start the download let reader = self.client.stream_external(&url).await?; + // In all wheels we've seen so far, unzipping while downloading is the + // faster option. + // + // Writing to a file first may be faster if the wheel takes longer to + // unzip than it takes to download. This may happen if the wheel is a + // zip bomb, or if the machine has a weak cpu (with many cores), but a + // fast network. + // + // If we find such a case, it may make sense to create separate tasks + // for downloading and unzipping (with a buffer in between) and switch + // to rayon if this buffer grows large by the time the file is fully + // downloaded. + let unzip_while_downloading = true; + if unzip_while_downloading { + // Download and unzip to a temporary dir + let temp_dir = tempfile::tempdir_in(self.cache.root())?; + let temp_target = temp_dir.path().join(&wheel.file.filename); + unzip_no_seek(reader.compat(), &temp_target).await?; + + // Move the dir to the right place + fs::create_dir_all(&cache_entry.dir()).await?; + let target = cache_entry.into_path_buf(); + tokio::fs::rename(temp_target, &target).await?; + + return Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: dist.clone(), + target, + filename: wheel_filename, + })); + } + // If the file is greater than 5MB, write it to disk; otherwise, keep it in memory. + // + // TODO this is currently dead code. Consider deleting if there's no use for it. let byte_size = wheel.file.size.map(ByteSize::b); let local_wheel = if let Some(byte_size) = byte_size.filter(|byte_size| *byte_size < ByteSize::mb(5)) { debug!("Fetching in-memory wheel from registry: {dist} ({byte_size})",); - let cache_entry = self.cache.entry( - CacheBucket::Wheels, - WheelCache::Index(&wheel.index) - .remote_wheel_dir(wheel_filename.name.as_ref()), - wheel_filename.stem(), - ); - // Read into a buffer. let mut buffer = Vec::with_capacity( wheel @@ -170,7 +205,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> CacheBucket::Wheels, WheelCache::Index(&wheel.index) .remote_wheel_dir(wheel_filename.name.as_ref()), - filename, + filename, // TODO should this be filename.stem() to match the other branch? ); fs::create_dir_all(&cache_entry.dir()).await?; tokio::fs::rename(temp_file, &cache_entry.path()).await?; @@ -193,31 +228,25 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> debug!("Fetching disk-based wheel from URL: {}", wheel.url); let reader = self.client.stream_external(&wheel.url).await?; - let filename = wheel.filename.to_string(); - // Download the wheel to a temporary file. + // Download and unzip the wheel to a temporary dir. let temp_dir = tempfile::tempdir_in(self.cache.root())?; - let temp_file = temp_dir.path().join(&filename); - let mut writer = - tokio::io::BufWriter::new(tokio::fs::File::create(&temp_file).await?); - tokio::io::copy(&mut reader.compat(), &mut writer).await?; + let temp_target = temp_dir.path().join(wheel.filename.to_string()); + unzip_no_seek(reader.compat(), &temp_target).await?; // Move the temporary file to the cache. let cache_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), - filename, + wheel.filename.stem(), ); fs::create_dir_all(&cache_entry.dir()).await?; - tokio::fs::rename(temp_file, &cache_entry.path()).await?; + let target = cache_entry.into_path_buf(); + tokio::fs::rename(temp_target, &target).await?; - let local_wheel = LocalWheel::Disk(DiskWheel { + let local_wheel = LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target: cache_entry - .with_file(wheel.filename.stem()) - .path() - .to_path_buf(), - path: cache_entry.into_path_buf(), + target, filename: wheel.filename.clone(), }); diff --git a/crates/puffin-distribution/src/download.rs b/crates/puffin-distribution/src/download.rs index 47dff95bf451..b3ca4a98d8ee 100644 --- a/crates/puffin-distribution/src/download.rs +++ b/crates/puffin-distribution/src/download.rs @@ -10,6 +10,17 @@ use pypi_types::Metadata21; use crate::error::Error; +/// A wheel that's been unzipped while downloading +#[derive(Debug, Clone)] +pub struct UnzippedWheel { + /// The remote distribution from which this wheel was downloaded. + pub(crate) dist: Dist, + /// The parsed filename. + pub(crate) filename: WheelFilename, + /// The path in the cache dir where the wheel was downloaded. + pub(crate) target: PathBuf, +} + /// A downloaded wheel that's stored in-memory. #[derive(Debug, Clone)] pub struct InMemoryWheel { @@ -52,6 +63,7 @@ pub struct BuiltWheel { /// A downloaded or built wheel. #[derive(Debug, Clone)] pub enum LocalWheel { + Unzipped(UnzippedWheel), InMemory(InMemoryWheel), Disk(DiskWheel), Built(BuiltWheel), @@ -61,6 +73,7 @@ impl LocalWheel { /// Return the path to the downloaded wheel's entry in the cache. pub fn target(&self) -> &Path { match self { + LocalWheel::Unzipped(wheel) => &wheel.target, LocalWheel::InMemory(wheel) => &wheel.target, LocalWheel::Disk(wheel) => &wheel.target, LocalWheel::Built(wheel) => &wheel.target, @@ -70,6 +83,7 @@ impl LocalWheel { /// Return the [`Dist`] from which this wheel was downloaded. pub fn remote(&self) -> &Dist { match self { + LocalWheel::Unzipped(wheel) => wheel.remote(), LocalWheel::InMemory(wheel) => wheel.remote(), LocalWheel::Disk(wheel) => wheel.remote(), LocalWheel::Built(wheel) => wheel.remote(), @@ -79,6 +93,7 @@ impl LocalWheel { /// Return the [`WheelFilename`] of this wheel. pub fn filename(&self) -> &WheelFilename { match self { + LocalWheel::Unzipped(wheel) => &wheel.filename, LocalWheel::InMemory(wheel) => &wheel.filename, LocalWheel::Disk(wheel) => &wheel.filename, LocalWheel::Built(wheel) => &wheel.filename, @@ -86,6 +101,13 @@ impl LocalWheel { } } +impl UnzippedWheel { + /// Return the [`Dist`] from which this wheel was downloaded. + pub fn remote(&self) -> &Dist { + &self.dist + } +} + impl DiskWheel { /// Return the [`Dist`] from which this wheel was downloaded. pub fn remote(&self) -> &Dist { diff --git a/crates/puffin-distribution/src/unzip.rs b/crates/puffin-distribution/src/unzip.rs index d7192605bbea..6708cc5765d2 100644 --- a/crates/puffin-distribution/src/unzip.rs +++ b/crates/puffin-distribution/src/unzip.rs @@ -31,6 +31,7 @@ impl Unzip for BuiltWheel { impl Unzip for LocalWheel { fn unzip(&self, target: &Path) -> Result<(), Error> { match self { + LocalWheel::Unzipped(_) => Ok(()), LocalWheel::InMemory(wheel) => wheel.unzip(target), LocalWheel::Disk(wheel) => wheel.unzip(target), LocalWheel::Built(wheel) => wheel.unzip(target), diff --git a/crates/puffin-extract/Cargo.toml b/crates/puffin-extract/Cargo.toml index d34889118ef9..86eb5a110fd5 100644 --- a/crates/puffin-extract/Cargo.toml +++ b/crates/puffin-extract/Cargo.toml @@ -13,6 +13,8 @@ license = { workspace = true } workspace = true [dependencies] +tokio-util = { workspace = true, features = ["compat"] } +async_zip = { workspace = true, features = ["tokio"] } flate2 = { workspace = true } fs-err = { workspace = true } rayon = { workspace = true } diff --git a/crates/puffin-extract/src/lib.rs b/crates/puffin-extract/src/lib.rs index 02cccdd92871..4a40b5b69ca6 100644 --- a/crates/puffin-extract/src/lib.rs +++ b/crates/puffin-extract/src/lib.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use rayon::prelude::*; +use tokio_util::compat::FuturesAsyncReadCompatExt; use zip::result::ZipError; use zip::ZipArchive; @@ -13,6 +14,8 @@ pub enum Error { #[error(transparent)] Zip(#[from] ZipError), #[error(transparent)] + AsyncZip(#[from] async_zip::error::ZipError), + #[error(transparent)] Io(#[from] std::io::Error), #[error("Unsupported archive type: {0}")] UnsupportedArchive(PathBuf), @@ -22,6 +25,44 @@ pub enum Error { InvalidArchive(Vec), } +/// Unzip a `.zip` archive into the target directory without requiring Seek. +/// +/// This is useful for unzipping files as they're being downloaded. If the archive +/// is already fully on disk, consider using `unzip_archive`, which can use multiple +/// threads to work faster in that case. +pub async fn unzip_no_seek( + reader: R, + target: &Path, +) -> Result<(), Error> { + let mut zip = async_zip::base::read::stream::ZipFileReader::with_tokio(reader); + + while let Some(mut entry) = zip.next_with_entry().await? { + // Construct path + let path = entry.reader().entry().filename().as_str()?; + let path = target.join(path); + let is_dir = entry.reader().entry().dir()?; + + // Create dir or write file + if is_dir { + tokio::fs::create_dir_all(path).await?; + } else { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(path).await?; + let mut writer = tokio::io::BufWriter::new(file); + let mut reader = entry.reader_mut().compat(); + tokio::io::copy(&mut reader, &mut writer).await?; + } + + // Close current file to get access to the next one. See docs: + // https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/ + zip = entry.skip().await?; + } + + Ok(()) +} + /// Unzip a `.zip` archive into the target directory. pub fn unzip_archive( reader: R, diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 0dfbcc0cdbe6..9e36bef7134d 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -205,33 +205,39 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } // Unzip the wheel. - let normalized_path = tokio::task::spawn_blocking({ - move || -> Result { - // Unzip the wheel into a temporary directory. - let parent = download - .target() - .parent() - .expect("Cache paths can't be root"); - fs_err::create_dir_all(parent)?; - let staging = tempfile::tempdir_in(parent)?; - download.unzip(staging.path())?; - - // Move the unzipped wheel into the cache. - if let Err(err) = fs_err::rename(staging.into_path(), download.target()) { - // If another thread already unpacked the wheel, we can ignore the error. - return if download.target().is_dir() { - warn!("Wheel is already unpacked: {}", download.remote()); - Ok(download.target().to_path_buf()) - } else { - Err(err.into()) - }; - } + let normalized_path = if matches!(download, LocalWheel::Unzipped(_)) { + // Just an optimizaion: Avoid spawning a blocking + // task if there is no work to be done. + download.target().to_path_buf() + } else { + tokio::task::spawn_blocking({ + move || -> Result { + // Unzip the wheel into a temporary directory. + let parent = download + .target() + .parent() + .expect("Cache paths can't be root"); + fs_err::create_dir_all(parent)?; + let staging = tempfile::tempdir_in(parent)?; + download.unzip(staging.path())?; + + // Move the unzipped wheel into the cache. + if let Err(err) = fs_err::rename(staging.into_path(), download.target()) { + // If another thread already unpacked the wheel, we can ignore the error. + return if download.target().is_dir() { + warn!("Wheel is already unpacked: {}", download.remote()); + Ok(download.target().to_path_buf()) + } else { + Err(err.into()) + }; + } - Ok(download.target().to_path_buf()) - } - }) - .await? - .map_err(|err| Error::Unzip(remote.clone(), err))?; + Ok(download.target().to_path_buf()) + } + }) + .await? + .map_err(|err| Error::Unzip(remote.clone(), err))? + }; Ok(CachedDist::from_remote(remote, filename, normalized_path)) }