Skip to content

Commit

Permalink
Cache downloaded wheel when range requests aren't supported (#5089)
Browse files Browse the repository at this point in the history
## Summary

When range requests aren't supported, we fall back to streaming the
wheel, stopping as soon as we hit a `METADATA` file. This is a small
optimization, but the downside is that we don't get to cache the
resulting wheel...

We don't know whether `METADATA` will be at the beginning or end of the
wheel, but it _seems_ like a better tradeoff to download and cache the
entire wheel?

Closes: #5088.

Sort of a revert of: #1792.
  • Loading branch information
charliermarsh committed Jul 16, 2024
1 parent 0e7df96 commit 2ff3b38
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 114 deletions.
113 changes: 4 additions & 109 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;

use async_http_range_reader::AsyncHttpRangeReader;
use futures::{FutureExt, TryStreamExt};
use futures::FutureExt;
use http::HeaderMap;
use reqwest::{Client, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::{info_span, instrument, trace, warn, Instrument};
use url::Url;

use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name};
use install_wheel_rs::metadata::{find_archive_dist_info, is_metadata_entry};
use install_wheel_rs::metadata::find_archive_dist_info;
use pep440_rs::Version;
use pep508_rs::MarkerEnvironment;
use platform_tags::Platform;
Expand Down Expand Up @@ -598,76 +597,16 @@ impl RegistryClient {
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};

let result = self
.cached_client()
self.cached_client()
.get_serde(
req,
&cache_entry,
cache_control,
read_metadata_range_request,
)
.await
.map_err(crate::Error::from);

match result {
Ok(metadata) => return Ok(metadata),
Err(err) => {
if err.is_http_range_requests_unsupported() {
// The range request version failed. Fall back to streaming the file to search
// for the METADATA file.
warn!("Range requests not supported for {filename}; streaming wheel");
} else {
return Err(err);
}
}
};

// Create a request to stream the file.
let req = self
.uncached_client()
.get(url.clone())
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()
.map_err(ErrorKind::from)?;

// Stream the file, searching for the METADATA.
let read_metadata_stream = |response: Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();

read_metadata_async_stream(filename, url.to_string(), reader).await
}
.instrument(info_span!("read_metadata_stream", wheel = %filename))
};

self.cached_client()
.get_serde(req, &cache_entry, cache_control, read_metadata_stream)
.await
.map_err(crate::Error::from)
}

/// Handle a specific `reqwest` error, and convert it to [`io::Error`].
fn handle_response_errors(&self, err: reqwest::Error) -> std::io::Error {
if err.is_timeout() {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.timeout()
),
)
} else {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
}
}

/// Read a wheel's `METADATA` file from a zip file.
Expand Down Expand Up @@ -708,50 +647,6 @@ async fn read_metadata_async_seek(
Ok(metadata)
}

/// Like [`read_metadata_async_seek`], but doesn't use seek.
async fn read_metadata_async_stream<R: futures::AsyncRead + Unpin>(
filename: &WheelFilename,
debug_source: String,
reader: R,
) -> Result<Metadata23, Error> {
let reader = futures::io::BufReader::with_capacity(128 * 1024, reader);
let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader);

while let Some(mut entry) = zip
.next_with_entry()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?
{
// Find the `METADATA` entry.
let path = entry
.reader()
.entry()
.filename()
.as_str()
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;

if is_metadata_entry(path, filename) {
let mut reader = entry.reader_mut().compat();
let mut contents = Vec::new();
reader.read_to_end(&mut contents).await.unwrap();

let metadata = Metadata23::parse_metadata(&contents).map_err(|err| {
ErrorKind::MetadataParseError(filename.clone(), debug_source, Box::new(err))
})?;
return Ok(metadata);
}

// Close current file to get access to the next one. See docs:
// https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
zip = entry
.skip()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;
}

Err(ErrorKind::MetadataNotFound(filename.clone(), debug_source).into())
}

#[derive(
Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize,
)]
Expand Down
63 changes: 59 additions & 4 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,31 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
hashes: archive.hashes,
filename: wheel.filename.clone(),
}),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);

// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
url,
&wheel.filename,
wheel.file.size,
&wheel_entry,
dist,
hashes,
)
.await?;

Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self.build_context.cache().archive(&archive.id),
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
Err(Error::Extract(err)) => {
if err.is_http_streaming_unsupported() {
warn!(
Expand Down Expand Up @@ -293,6 +318,36 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
filename: wheel.filename.clone(),
})
}
Err(Error::Extract(err)) => {
if err.is_http_streaming_unsupported() {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
} else if err.is_http_streaming_failed() {
warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
} else {
return Err(Error::Extract(err));
}

// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
wheel.url.raw().clone(),
&wheel.filename,
None,
&wheel_entry,
dist,
hashes,
)
.await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self.build_context.cache().archive(&archive.id),
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -390,11 +445,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {

match result {
Ok(metadata) => Ok(ArchiveMetadata::from_metadata23(metadata)),
Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");
Err(err) if err.is_http_range_requests_unsupported() => {
warn!("Range requests unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");

// If the request failed due to an error that could be resolved by
// downloading the wheel directly, try that.
// If the request failed due to an error that could be resolved by downloading the
// wheel directly, try that.
let wheel = self.get_wheel(dist, hashes).await?;
let metadata = wheel.metadata()?;
let hashes = wheel.hashes;
Expand Down
1 change: 0 additions & 1 deletion crates/uv/tests/pip_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3227,7 +3227,6 @@ fn no_stream() -> Result<()> {
----- stderr -----
Resolved 1 package in [TIME]
Prepared 1 package in [TIME]
Installed 1 package in [TIME]
+ hashb-foxglove-protocolbuffers-python==25.3.0.1.20240226043130+465630478360
"###
Expand Down

0 comments on commit 2ff3b38

Please sign in to comment.