Skip to content

Commit

Permalink
Unzip while downloading (#856)
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanserafimov authored Jan 11, 2024
1 parent 4123a35 commit 10227a7
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 52 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 53 additions & 24 deletions crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use bytesize::ByteSize;
use fs_err::tokio as fs;
use puffin_extract::unzip_no_seek;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
Expand All @@ -21,7 +22,7 @@ use puffin_git::GitSource;
use puffin_traits::BuildContext;
use pypi_types::Metadata21;

use crate::download::BuiltWheel;
use crate::download::{BuiltWheel, UnzippedWheel};
use crate::locks::Locks;
use crate::reporter::Facade;
use crate::{
Expand All @@ -37,6 +38,8 @@ pub enum DistributionDatabaseError {
#[error(transparent)]
Client(#[from] puffin_client::Error),
#[error(transparent)]
Extract(#[from] puffin_extract::Error),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Distribution(#[from] distribution_types::Error),
Expand Down Expand Up @@ -108,30 +111,62 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
) -> Result<LocalWheel, DistributionDatabaseError> {
match &dist {
Dist::Built(BuiltDist::Registry(wheel)) => {
// Fetch the wheel.
let url = wheel
.base
.join_relative(&wheel.file.url)
.map_err(|err| DistributionDatabaseError::Url(wheel.file.url.clone(), err))?;

// Make cache entry
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);

// Start the download
let reader = self.client.stream_external(&url).await?;

// In all wheels we've seen so far, unzipping while downloading is the
// faster option.
//
// Writing to a file first may be faster if the wheel takes longer to
// unzip than it takes to download. This may happen if the wheel is a
// zip bomb, or if the machine has a weak cpu (with many cores), but a
// fast network.
//
// If we find such a case, it may make sense to create separate tasks
// for downloading and unzipping (with a buffer in between) and switch
// to rayon if this buffer grows large by the time the file is fully
// downloaded.
let unzip_while_downloading = true;
if unzip_while_downloading {
// Download and unzip to a temporary dir
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_target = temp_dir.path().join(&wheel.file.filename);
unzip_no_seek(reader.compat(), &temp_target).await?;

// Move the dir to the right place
fs::create_dir_all(&cache_entry.dir()).await?;
let target = cache_entry.into_path_buf();
tokio::fs::rename(temp_target, &target).await?;

return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target,
filename: wheel_filename,
}));
}

// If the file is greater than 5MB, write it to disk; otherwise, keep it in memory.
//
// TODO this is currently dead code. Consider deleting if there's no use for it.
let byte_size = wheel.file.size.map(ByteSize::b);
let local_wheel = if let Some(byte_size) =
byte_size.filter(|byte_size| *byte_size < ByteSize::mb(5))
{
debug!("Fetching in-memory wheel from registry: {dist} ({byte_size})",);

let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index)
.remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);

// Read into a buffer.
let mut buffer = Vec::with_capacity(
wheel
Expand Down Expand Up @@ -170,7 +205,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
CacheBucket::Wheels,
WheelCache::Index(&wheel.index)
.remote_wheel_dir(wheel_filename.name.as_ref()),
filename,
filename, // TODO should this be filename.stem() to match the other branch?
);
fs::create_dir_all(&cache_entry.dir()).await?;
tokio::fs::rename(temp_file, &cache_entry.path()).await?;
Expand All @@ -193,31 +228,25 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
debug!("Fetching disk-based wheel from URL: {}", wheel.url);

let reader = self.client.stream_external(&wheel.url).await?;
let filename = wheel.filename.to_string();

// Download the wheel to a temporary file.
// Download and unzip the wheel to a temporary dir.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_file = temp_dir.path().join(&filename);
let mut writer =
tokio::io::BufWriter::new(tokio::fs::File::create(&temp_file).await?);
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
let temp_target = temp_dir.path().join(wheel.filename.to_string());
unzip_no_seek(reader.compat(), &temp_target).await?;

// Move the temporary file to the cache.
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()),
filename,
wheel.filename.stem(),
);
fs::create_dir_all(&cache_entry.dir()).await?;
tokio::fs::rename(temp_file, &cache_entry.path()).await?;
let target = cache_entry.into_path_buf();
tokio::fs::rename(temp_target, &target).await?;

let local_wheel = LocalWheel::Disk(DiskWheel {
let local_wheel = LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target: cache_entry
.with_file(wheel.filename.stem())
.path()
.to_path_buf(),
path: cache_entry.into_path_buf(),
target,
filename: wheel.filename.clone(),
});

Expand Down
22 changes: 22 additions & 0 deletions crates/puffin-distribution/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ use pypi_types::Metadata21;

use crate::error::Error;

/// A wheel that's been unzipped while downloading
#[derive(Debug, Clone)]
pub struct UnzippedWheel {
/// The remote distribution from which this wheel was downloaded.
pub(crate) dist: Dist,
/// The parsed filename.
pub(crate) filename: WheelFilename,
/// The path in the cache dir where the wheel was downloaded.
pub(crate) target: PathBuf,
}

/// A downloaded wheel that's stored in-memory.
#[derive(Debug, Clone)]
pub struct InMemoryWheel {
Expand Down Expand Up @@ -52,6 +63,7 @@ pub struct BuiltWheel {
/// A downloaded or built wheel.
#[derive(Debug, Clone)]
pub enum LocalWheel {
Unzipped(UnzippedWheel),
InMemory(InMemoryWheel),
Disk(DiskWheel),
Built(BuiltWheel),
Expand All @@ -61,6 +73,7 @@ impl LocalWheel {
/// Return the path to the downloaded wheel's entry in the cache.
pub fn target(&self) -> &Path {
match self {
LocalWheel::Unzipped(wheel) => &wheel.target,
LocalWheel::InMemory(wheel) => &wheel.target,
LocalWheel::Disk(wheel) => &wheel.target,
LocalWheel::Built(wheel) => &wheel.target,
Expand All @@ -70,6 +83,7 @@ impl LocalWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
match self {
LocalWheel::Unzipped(wheel) => wheel.remote(),
LocalWheel::InMemory(wheel) => wheel.remote(),
LocalWheel::Disk(wheel) => wheel.remote(),
LocalWheel::Built(wheel) => wheel.remote(),
Expand All @@ -79,13 +93,21 @@ impl LocalWheel {
/// Return the [`WheelFilename`] of this wheel.
pub fn filename(&self) -> &WheelFilename {
match self {
LocalWheel::Unzipped(wheel) => &wheel.filename,
LocalWheel::InMemory(wheel) => &wheel.filename,
LocalWheel::Disk(wheel) => &wheel.filename,
LocalWheel::Built(wheel) => &wheel.filename,
}
}
}

impl UnzippedWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
&self.dist
}
}

impl DiskWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
Expand Down
1 change: 1 addition & 0 deletions crates/puffin-distribution/src/unzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Unzip for BuiltWheel {
impl Unzip for LocalWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
match self {
LocalWheel::Unzipped(_) => Ok(()),
LocalWheel::InMemory(wheel) => wheel.unzip(target),
LocalWheel::Disk(wheel) => wheel.unzip(target),
LocalWheel::Built(wheel) => wheel.unzip(target),
Expand Down
2 changes: 2 additions & 0 deletions crates/puffin-extract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ license = { workspace = true }
workspace = true

[dependencies]
tokio-util = { workspace = true, features = ["compat"] }
async_zip = { workspace = true, features = ["tokio"] }
flate2 = { workspace = true }
fs-err = { workspace = true }
rayon = { workspace = true }
Expand Down
41 changes: 41 additions & 0 deletions crates/puffin-extract/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::{Path, PathBuf};

use rayon::prelude::*;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use zip::result::ZipError;
use zip::ZipArchive;

Expand All @@ -13,6 +14,8 @@ pub enum Error {
#[error(transparent)]
Zip(#[from] ZipError),
#[error(transparent)]
AsyncZip(#[from] async_zip::error::ZipError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Unsupported archive type: {0}")]
UnsupportedArchive(PathBuf),
Expand All @@ -22,6 +25,44 @@ pub enum Error {
InvalidArchive(Vec<fs_err::DirEntry>),
}

/// Unzip a `.zip` archive into the target directory without requiring Seek.
///
/// This is useful for unzipping files as they're being downloaded. If the archive
/// is already fully on disk, consider using `unzip_archive`, which can use multiple
/// threads to work faster in that case.
pub async fn unzip_no_seek<R: tokio::io::AsyncRead + Unpin>(
reader: R,
target: &Path,
) -> Result<(), Error> {
let mut zip = async_zip::base::read::stream::ZipFileReader::with_tokio(reader);

while let Some(mut entry) = zip.next_with_entry().await? {
// Construct path
let path = entry.reader().entry().filename().as_str()?;
let path = target.join(path);
let is_dir = entry.reader().entry().dir()?;

// Create dir or write file
if is_dir {
tokio::fs::create_dir_all(path).await?;
} else {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::File::create(path).await?;
let mut writer = tokio::io::BufWriter::new(file);
let mut reader = entry.reader_mut().compat();
tokio::io::copy(&mut reader, &mut writer).await?;
}

// Close current file to get access to the next one. See docs:
// https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
zip = entry.skip().await?;
}

Ok(())
}

/// Unzip a `.zip` archive into the target directory.
pub fn unzip_archive<R: Send + std::io::Read + std::io::Seek + HasLength>(
reader: R,
Expand Down
58 changes: 32 additions & 26 deletions crates/puffin-installer/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,33 +205,39 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
}

// Unzip the wheel.
let normalized_path = tokio::task::spawn_blocking({
move || -> Result<PathBuf, puffin_extract::Error> {
// Unzip the wheel into a temporary directory.
let parent = download
.target()
.parent()
.expect("Cache paths can't be root");
fs_err::create_dir_all(parent)?;
let staging = tempfile::tempdir_in(parent)?;
download.unzip(staging.path())?;

// Move the unzipped wheel into the cache.
if let Err(err) = fs_err::rename(staging.into_path(), download.target()) {
// If another thread already unpacked the wheel, we can ignore the error.
return if download.target().is_dir() {
warn!("Wheel is already unpacked: {}", download.remote());
Ok(download.target().to_path_buf())
} else {
Err(err.into())
};
}
let normalized_path = if matches!(download, LocalWheel::Unzipped(_)) {
// Just an optimizaion: Avoid spawning a blocking
// task if there is no work to be done.
download.target().to_path_buf()
} else {
tokio::task::spawn_blocking({
move || -> Result<PathBuf, puffin_extract::Error> {
// Unzip the wheel into a temporary directory.
let parent = download
.target()
.parent()
.expect("Cache paths can't be root");
fs_err::create_dir_all(parent)?;
let staging = tempfile::tempdir_in(parent)?;
download.unzip(staging.path())?;

// Move the unzipped wheel into the cache.
if let Err(err) = fs_err::rename(staging.into_path(), download.target()) {
// If another thread already unpacked the wheel, we can ignore the error.
return if download.target().is_dir() {
warn!("Wheel is already unpacked: {}", download.remote());
Ok(download.target().to_path_buf())
} else {
Err(err.into())
};
}

Ok(download.target().to_path_buf())
}
})
.await?
.map_err(|err| Error::Unzip(remote.clone(), err))?;
Ok(download.target().to_path_buf())
}
})
.await?
.map_err(|err| Error::Unzip(remote.clone(), err))?
};

Ok(CachedDist::from_remote(remote, filename, normalized_path))
}
Expand Down

0 comments on commit 10227a7

Please sign in to comment.