Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unzip while downloading #856

Merged
merged 22 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 53 additions & 24 deletions crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use bytesize::ByteSize;
use fs_err::tokio as fs;
use puffin_extract::unzip_no_seek;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
Expand All @@ -21,7 +22,7 @@ use puffin_git::GitSource;
use puffin_traits::BuildContext;
use pypi_types::Metadata21;

use crate::download::BuiltWheel;
use crate::download::{BuiltWheel, UnzippedWheel};
use crate::locks::Locks;
use crate::reporter::Facade;
use crate::{
Expand All @@ -37,6 +38,8 @@ pub enum DistributionDatabaseError {
#[error(transparent)]
Client(#[from] puffin_client::Error),
#[error(transparent)]
Extract(#[from] puffin_extract::Error),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Distribution(#[from] distribution_types::Error),
Expand Down Expand Up @@ -108,30 +111,62 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
) -> Result<LocalWheel, DistributionDatabaseError> {
match &dist {
Dist::Built(BuiltDist::Registry(wheel)) => {
bojanserafimov marked this conversation as resolved.
Show resolved Hide resolved
// Fetch the wheel.
let url = wheel
.base
.join_relative(&wheel.file.url)
.map_err(|err| DistributionDatabaseError::Url(wheel.file.url.clone(), err))?;

// Make cache entry
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);

// Start the download
let reader = self.client.stream_external(&url).await?;

// In all wheels we've seen so far, unzipping while downloading is the
// faster option.
//
// Writing to a file first may be faster if the wheel takes longer to
// unzip than it takes to download. This may happen if the wheel is a
// zip bomb, or if the machine has a weak cpu (with many cores), but a
// fast network.
//
// If we find such a case, it may make sense to create separate tasks
// for downloading and unzipping (with a buffer in between) and switch
// to rayon if this buffer grows large by the time the file is fully
// downloaded.
let unzip_while_downloading = true;
if unzip_while_downloading {
// Download and unzip to a temporary dir
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_target = temp_dir.path().join(&wheel.file.filename);
unzip_no_seek(reader.compat(), &temp_target).await?;

// Move the dir to the right place
fs::create_dir_all(&cache_entry.dir()).await?;
let target = cache_entry.into_path_buf();
tokio::fs::rename(temp_target, &target).await?;

return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target,
filename: wheel_filename,
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the thinking behind leaving the dead code below? Is it so the code can be changed back to the old strategy easily? I think I'd prefer just deleting the dead code. It can always be revised from source history if necessary.

An alternative would be to make a runtime option to toggle between the two strategies if we wanted to keep both around, but I'm not sure how valuable that is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a good chance we'd want this code just because we'll get more tests and it would make sense to compare both versions. Current tests are done on my machine only.

We could dig out this code from history, but there will be merge conflicts with changes that pile up.

If we want to delete this now I'd still do it in a separate "refactor only" PR. There would be too much noise in this PR if I do it now.

Copy link
Contributor Author

@bojanserafimov bojanserafimov Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lmk what you prefer. I can send a PR deleting this on top of this one. There will be a cascade of related deletes, which is why I think bringing it back will involve merge conflicts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO best practice would be to either: (1) remove, since we can always restore from source control; or (2) retain, but thread through a CLI flag (could be user-hidden) to let us toggle, and ensure there's test coverage. Otherwise, it's very likely to grow stale while still requiring some amount of maintenance, which is kind of the worst of both worlds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'll remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, you can do it in a separate PR with this branch marked as upstream if you like.


// If the file is greater than 5MB, write it to disk; otherwise, keep it in memory.
//
// TODO this is currently dead code. Consider deleting if there's no use for it.
let byte_size = wheel.file.size.map(ByteSize::b);
let local_wheel = if let Some(byte_size) =
byte_size.filter(|byte_size| *byte_size < ByteSize::mb(5))
{
debug!("Fetching in-memory wheel from registry: {dist} ({byte_size})",);

let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index)
.remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);

// Read into a buffer.
let mut buffer = Vec::with_capacity(
wheel
Expand Down Expand Up @@ -170,7 +205,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
CacheBucket::Wheels,
WheelCache::Index(&wheel.index)
.remote_wheel_dir(wheel_filename.name.as_ref()),
filename,
filename, // TODO should this be filename.stem() to match the other branch?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh hmm... I would say they should both use .stem(), yeah. Can you try this in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be deleted

);
fs::create_dir_all(&cache_entry.dir()).await?;
tokio::fs::rename(temp_file, &cache_entry.path()).await?;
Expand All @@ -193,31 +228,25 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
debug!("Fetching disk-based wheel from URL: {}", wheel.url);

let reader = self.client.stream_external(&wheel.url).await?;
let filename = wheel.filename.to_string();

// Download the wheel to a temporary file.
// Download and unzip the wheel to a temporary dir.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_file = temp_dir.path().join(&filename);
let mut writer =
tokio::io::BufWriter::new(tokio::fs::File::create(&temp_file).await?);
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
let temp_target = temp_dir.path().join(wheel.filename.to_string());
unzip_no_seek(reader.compat(), &temp_target).await?;

// Move the temporary file to the cache.
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()),
filename,
wheel.filename.stem(),
);
fs::create_dir_all(&cache_entry.dir()).await?;
tokio::fs::rename(temp_file, &cache_entry.path()).await?;
let target = cache_entry.into_path_buf();
tokio::fs::rename(temp_target, &target).await?;

let local_wheel = LocalWheel::Disk(DiskWheel {
let local_wheel = LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target: cache_entry
.with_file(wheel.filename.stem())
.path()
.to_path_buf(),
path: cache_entry.into_path_buf(),
target,
filename: wheel.filename.clone(),
});

Expand Down
22 changes: 22 additions & 0 deletions crates/puffin-distribution/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ use pypi_types::Metadata21;

use crate::error::Error;

/// A wheel that's been unzipped while downloading
#[derive(Debug, Clone)]
pub struct UnzippedWheel {
/// The remote distribution from which this wheel was downloaded.
pub(crate) dist: Dist,
/// The parsed filename.
pub(crate) filename: WheelFilename,
/// The path in the cache dir where the wheel was downloaded.
pub(crate) target: PathBuf,
}

/// A downloaded wheel that's stored in-memory.
#[derive(Debug, Clone)]
pub struct InMemoryWheel {
Expand Down Expand Up @@ -52,6 +63,7 @@ pub struct BuiltWheel {
/// A downloaded or built wheel.
#[derive(Debug, Clone)]
pub enum LocalWheel {
Unzipped(UnzippedWheel),
InMemory(InMemoryWheel),
Disk(DiskWheel),
Built(BuiltWheel),
Expand All @@ -61,6 +73,7 @@ impl LocalWheel {
/// Return the path to the downloaded wheel's entry in the cache.
pub fn target(&self) -> &Path {
match self {
LocalWheel::Unzipped(wheel) => &wheel.target,
LocalWheel::InMemory(wheel) => &wheel.target,
LocalWheel::Disk(wheel) => &wheel.target,
LocalWheel::Built(wheel) => &wheel.target,
Expand All @@ -70,6 +83,7 @@ impl LocalWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
match self {
LocalWheel::Unzipped(wheel) => wheel.remote(),
LocalWheel::InMemory(wheel) => wheel.remote(),
LocalWheel::Disk(wheel) => wheel.remote(),
LocalWheel::Built(wheel) => wheel.remote(),
Expand All @@ -79,13 +93,21 @@ impl LocalWheel {
/// Return the [`WheelFilename`] of this wheel.
pub fn filename(&self) -> &WheelFilename {
match self {
LocalWheel::Unzipped(wheel) => &wheel.filename,
LocalWheel::InMemory(wheel) => &wheel.filename,
LocalWheel::Disk(wheel) => &wheel.filename,
LocalWheel::Built(wheel) => &wheel.filename,
}
}
}

impl UnzippedWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
&self.dist
}
}

impl DiskWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
Expand Down
1 change: 1 addition & 0 deletions crates/puffin-distribution/src/unzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Unzip for BuiltWheel {
impl Unzip for LocalWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
match self {
LocalWheel::Unzipped(_) => Ok(()),
LocalWheel::InMemory(wheel) => wheel.unzip(target),
LocalWheel::Disk(wheel) => wheel.unzip(target),
LocalWheel::Built(wheel) => wheel.unzip(target),
Expand Down
2 changes: 2 additions & 0 deletions crates/puffin-extract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ license = { workspace = true }
workspace = true

[dependencies]
tokio-util = { workspace = true, features = ["compat"] }
async_zip = { workspace = true, features = ["tokio"] }
flate2 = { workspace = true }
fs-err = { workspace = true }
rayon = { workspace = true }
Expand Down
41 changes: 41 additions & 0 deletions crates/puffin-extract/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::{Path, PathBuf};

use rayon::prelude::*;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use zip::result::ZipError;
use zip::ZipArchive;

Expand All @@ -13,6 +14,8 @@ pub enum Error {
#[error(transparent)]
Zip(#[from] ZipError),
#[error(transparent)]
AsyncZip(#[from] async_zip::error::ZipError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Unsupported archive type: {0}")]
UnsupportedArchive(PathBuf),
Expand All @@ -22,6 +25,44 @@ pub enum Error {
InvalidArchive(Vec<fs_err::DirEntry>),
}

/// Unzip a `.zip` archive into the target directory without requiring Seek.
///
/// This is useful for unzipping files as they're being downloaded. If the archive
/// is already fully on disk, consider using `unzip_archive`, which can use multiple
/// threads to work faster in that case.
pub async fn unzip_no_seek<R: tokio::io::AsyncRead + Unpin>(
reader: R,
target: &Path,
) -> Result<(), Error> {
let mut zip = async_zip::base::read::stream::ZipFileReader::with_tokio(reader);

while let Some(mut entry) = zip.next_with_entry().await? {
// Construct path
let path = entry.reader().entry().filename().as_str()?;
let path = target.join(path);
let is_dir = entry.reader().entry().dir()?;

// Create dir or write file
if is_dir {
tokio::fs::create_dir_all(path).await?;
} else {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::File::create(path).await?;
let mut writer = tokio::io::BufWriter::new(file);
let mut reader = entry.reader_mut().compat();
tokio::io::copy(&mut reader, &mut writer).await?;
}

// Close current file to get access to the next one. See docs:
// https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
zip = entry.skip().await?;
}

Ok(())
}

/// Unzip a `.zip` archive into the target directory.
pub fn unzip_archive<R: Send + std::io::Read + std::io::Seek + HasLength>(
reader: R,
Expand Down
58 changes: 32 additions & 26 deletions crates/puffin-installer/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,33 +205,39 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
}

// Unzip the wheel.
let normalized_path = tokio::task::spawn_blocking({
move || -> Result<PathBuf, puffin_extract::Error> {
// Unzip the wheel into a temporary directory.
let parent = download
.target()
.parent()
.expect("Cache paths can't be root");
fs_err::create_dir_all(parent)?;
let staging = tempfile::tempdir_in(parent)?;
download.unzip(staging.path())?;

// Move the unzipped wheel into the cache.
if let Err(err) = fs_err::rename(staging.into_path(), download.target()) {
// If another thread already unpacked the wheel, we can ignore the error.
return if download.target().is_dir() {
warn!("Wheel is already unpacked: {}", download.remote());
Ok(download.target().to_path_buf())
} else {
Err(err.into())
};
}
let normalized_path = if matches!(download, LocalWheel::Unzipped(_)) {
// Just an optimizaion: Avoid spawning a blocking
// task if there is no work to be done.
download.target().to_path_buf()
} else {
tokio::task::spawn_blocking({
move || -> Result<PathBuf, puffin_extract::Error> {
// Unzip the wheel into a temporary directory.
let parent = download
.target()
.parent()
.expect("Cache paths can't be root");
fs_err::create_dir_all(parent)?;
let staging = tempfile::tempdir_in(parent)?;
download.unzip(staging.path())?;

// Move the unzipped wheel into the cache.
if let Err(err) = fs_err::rename(staging.into_path(), download.target()) {
// If another thread already unpacked the wheel, we can ignore the error.
return if download.target().is_dir() {
warn!("Wheel is already unpacked: {}", download.remote());
Ok(download.target().to_path_buf())
} else {
Err(err.into())
};
}

Ok(download.target().to_path_buf())
}
})
.await?
.map_err(|err| Error::Unzip(remote.clone(), err))?;
Ok(download.target().to_path_buf())
}
})
.await?
.map_err(|err| Error::Unzip(remote.clone(), err))?
};

Ok(CachedDist::from_remote(remote, filename, normalized_path))
}
Expand Down