Skip to content

Commit

Permalink
Upgrade rs-async-zip to support data descriptors (#2809)
Browse files Browse the repository at this point in the history
## Summary

Upgrading `rs-async-zip` enables us to support data descriptors in
streaming. This both greatly improves performance for indexes that use
data descriptors _and_ ensures that we support them in a few other
places (e.g., zipped source distributions created in Finder).

Closes #2808.
  • Loading branch information
charliermarsh authored Apr 4, 2024
1 parent 34341bd commit dc2c289
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 19 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async-channel = { version = "2.2.0" }
async-compression = { version = "0.4.6" }
async-trait = { version = "0.1.78" }
async_http_range_reader = { version = "0.7.0" }
async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "d76801da0943de985254fc6255c0e476b57c5836", features = ["deflate"] }
async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "1dcb40cfe1bf5325a6fd4bfcf9894db40241f585", features = ["deflate"] }
axoupdater = { version = "0.3.1", default-features = false }
backoff = { version = "0.4.0" }
base64 = { version = "0.21.7" }
Expand Down
6 changes: 4 additions & 2 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http::HeaderMap;
use reqwest::{Client, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use tracing::{info_span, instrument, trace, warn, Instrument};
use url::Url;

Expand Down Expand Up @@ -618,7 +618,8 @@ async fn read_metadata_async_seek(
debug_source: String,
reader: impl tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin,
) -> Result<Metadata23, Error> {
let mut zip_reader = async_zip::tokio::read::seek::ZipFileReader::with_tokio(reader)
let reader = futures::io::BufReader::new(reader.compat());
let mut zip_reader = async_zip::base::read::seek::ZipFileReader::new(reader)
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;

Expand Down Expand Up @@ -655,6 +656,7 @@ async fn read_metadata_async_stream<R: futures::AsyncRead + Unpin>(
debug_source: String,
reader: R,
) -> Result<Metadata23, Error> {
let reader = futures::io::BufReader::with_capacity(128 * 1024, reader);
let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader);

while let Some(mut entry) = zip
Expand Down
6 changes: 4 additions & 2 deletions crates/uv-client/src/remote_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_http_range_reader::AsyncHttpRangeReader;
use async_zip::tokio::read::seek::ZipFileReader;
use futures::io::BufReader;
use tokio_util::compat::TokioAsyncReadCompatExt;

use distribution_filename::WheelFilename;
Expand Down Expand Up @@ -61,7 +61,8 @@ pub(crate) async fn wheel_metadata_from_remote_zip(
.await;

// Construct a zip reader to uses the stream.
let mut reader = ZipFileReader::new(reader.compat())
let buf = BufReader::new(reader.compat());
let mut reader = async_zip::base::read::seek::ZipFileReader::new(buf)
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;

Expand Down Expand Up @@ -90,6 +91,7 @@ pub(crate) async fn wheel_metadata_from_remote_zip(
reader
.inner_mut()
.get_mut()
.get_mut()
.prefetch(offset..offset + size)
.await;

Expand Down
3 changes: 1 addition & 2 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
file.seek(io::SeekFrom::Start(0))
.await
.map_err(Error::CacheWrite)?;
let reader = tokio::io::BufReader::new(file);
uv_extract::seek::unzip(reader, temp_dir.path()).await?;
uv_extract::seek::unzip(file, temp_dir.path()).await?;

// Persist the temporary directory to the directory store.
let archive = self
Expand Down
3 changes: 1 addition & 2 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,7 @@ async fn extract_archive(path: &Path, cache: &Cache) -> Result<ExtractedSource,
let reader = fs_err::tokio::File::open(&path)
.await
.map_err(Error::CacheRead)?;
uv_extract::seek::archive(tokio::io::BufReader::new(reader), path, &temp_dir.path())
.await?;
uv_extract::seek::archive(reader, path, &temp_dir.path()).await?;

// Extract the top-level directory from the archive.
let extracted = match uv_extract::strip_component(temp_dir.path()) {
Expand Down
4 changes: 2 additions & 2 deletions crates/uv-extract/src/seek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub async fn unzip<R: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin>(
target: impl AsRef<Path>,
) -> Result<(), Error> {
let target = target.as_ref();
let mut reader = reader.compat();
let mut reader = futures::io::BufReader::new(reader.compat());
let mut zip = async_zip::base::read::seek::ZipFileReader::new(&mut reader).await?;

let mut directories = FxHashSet::default();
Expand Down Expand Up @@ -81,7 +81,7 @@ pub async fn unzip<R: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin>(
}

/// Unzip a `.zip` or `.tar.gz` archive into the target directory, requiring `Seek`.
pub async fn archive<R: tokio::io::AsyncBufRead + tokio::io::AsyncSeek + Unpin>(
pub async fn archive<R: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin>(
reader: R,
source: impl AsRef<Path>,
target: impl AsRef<Path>,
Expand Down
10 changes: 4 additions & 6 deletions crates/uv-extract/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ pub async fn unzip<R: tokio::io::AsyncRead + Unpin>(
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;

// To avoid lots of small reads to `reader` when parsing the central directory, wrap it in
// a buffer.
let mut buf = futures::io::BufReader::new(reader);
let mut directory = async_zip::base::read::cd::CentralDirectoryReader::new(&mut buf);
let mut directory = async_zip::base::read::cd::CentralDirectoryReader::new(&mut reader);
while let Some(entry) = directory.next().await? {
if entry.dir()? {
continue;
Expand Down Expand Up @@ -154,10 +151,11 @@ async fn untar_in<R: tokio::io::AsyncRead + Unpin, P: AsRef<Path>>(
/// Unzip a `.tar.gz` archive into the target directory, without requiring `Seek`.
///
/// This is useful for unpacking files as they're being downloaded.
pub async fn untar<R: tokio::io::AsyncBufRead + Unpin>(
pub async fn untar<R: tokio::io::AsyncRead + Unpin>(
reader: R,
target: impl AsRef<Path>,
) -> Result<(), Error> {
let reader = tokio::io::BufReader::new(reader);
let decompressed_bytes = async_compression::tokio::bufread::GzipDecoder::new(reader);
let mut archive = tokio_tar::ArchiveBuilder::new(decompressed_bytes)
.set_preserve_mtime(false)
Expand All @@ -166,7 +164,7 @@ pub async fn untar<R: tokio::io::AsyncBufRead + Unpin>(
}

/// Unzip a `.zip` or `.tar.gz` archive into the target directory, without requiring `Seek`.
pub async fn archive<R: tokio::io::AsyncBufRead + Unpin>(
pub async fn archive<R: tokio::io::AsyncRead + Unpin>(
reader: R,
source: impl AsRef<Path>,
target: impl AsRef<Path>,
Expand Down

0 comments on commit dc2c289

Please sign in to comment.