Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache downloaded wheel when range requests aren't supported #5089

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 4 additions & 109 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;

use async_http_range_reader::AsyncHttpRangeReader;
use futures::{FutureExt, TryStreamExt};
use futures::FutureExt;
use http::HeaderMap;
use reqwest::{Client, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::{info_span, instrument, trace, warn, Instrument};
use url::Url;

use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name};
use install_wheel_rs::metadata::{find_archive_dist_info, is_metadata_entry};
use install_wheel_rs::metadata::find_archive_dist_info;
use pep440_rs::Version;
use pep508_rs::MarkerEnvironment;
use platform_tags::Platform;
Expand Down Expand Up @@ -598,76 +597,16 @@ impl RegistryClient {
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};

let result = self
.cached_client()
self.cached_client()
.get_serde(
req,
&cache_entry,
cache_control,
read_metadata_range_request,
)
.await
.map_err(crate::Error::from);

match result {
Ok(metadata) => return Ok(metadata),
Err(err) => {
if err.is_http_range_requests_unsupported() {
// The range request version failed. Fall back to streaming the file to search
// for the METADATA file.
warn!("Range requests not supported for {filename}; streaming wheel");
} else {
return Err(err);
}
}
};

// Create a request to stream the file.
let req = self
.uncached_client()
.get(url.clone())
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()
.map_err(ErrorKind::from)?;

// Stream the file, searching for the METADATA.
let read_metadata_stream = |response: Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();

read_metadata_async_stream(filename, url.to_string(), reader).await
}
.instrument(info_span!("read_metadata_stream", wheel = %filename))
};

self.cached_client()
.get_serde(req, &cache_entry, cache_control, read_metadata_stream)
.await
.map_err(crate::Error::from)
}

/// Handle a specific `reqwest` error, and convert it to [`io::Error`].
fn handle_response_errors(&self, err: reqwest::Error) -> std::io::Error {
if err.is_timeout() {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.timeout()
),
)
} else {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
}
}

/// Read a wheel's `METADATA` file from a zip file.
Expand Down Expand Up @@ -708,50 +647,6 @@ async fn read_metadata_async_seek(
Ok(metadata)
}

/// Like [`read_metadata_async_seek`], but doesn't use seek.
async fn read_metadata_async_stream<R: futures::AsyncRead + Unpin>(
filename: &WheelFilename,
debug_source: String,
reader: R,
) -> Result<Metadata23, Error> {
let reader = futures::io::BufReader::with_capacity(128 * 1024, reader);
let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader);

while let Some(mut entry) = zip
.next_with_entry()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?
{
// Find the `METADATA` entry.
let path = entry
.reader()
.entry()
.filename()
.as_str()
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;

if is_metadata_entry(path, filename) {
let mut reader = entry.reader_mut().compat();
let mut contents = Vec::new();
reader.read_to_end(&mut contents).await.unwrap();

let metadata = Metadata23::parse_metadata(&contents).map_err(|err| {
ErrorKind::MetadataParseError(filename.clone(), debug_source, Box::new(err))
})?;
return Ok(metadata);
}

// Close current file to get access to the next one. See docs:
// https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
zip = entry
.skip()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;
}

Err(ErrorKind::MetadataNotFound(filename.clone(), debug_source).into())
}

#[derive(
Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize,
)]
Expand Down
63 changes: 59 additions & 4 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,31 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
hashes: archive.hashes,
filename: wheel.filename.clone(),
}),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);

// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
url,
&wheel.filename,
wheel.file.size,
&wheel_entry,
dist,
hashes,
)
.await?;

Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self.build_context.cache().archive(&archive.id),
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
Err(Error::Extract(err)) => {
if err.is_http_streaming_unsupported() {
warn!(
Expand Down Expand Up @@ -293,6 +318,36 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
filename: wheel.filename.clone(),
})
}
Err(Error::Extract(err)) => {
if err.is_http_streaming_unsupported() {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
} else if err.is_http_streaming_failed() {
warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
} else {
return Err(Error::Extract(err));
}

// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
wheel.url.raw().clone(),
&wheel.filename,
None,
&wheel_entry,
dist,
hashes,
)
.await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self.build_context.cache().archive(&archive.id),
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -390,11 +445,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {

match result {
Ok(metadata) => Ok(ArchiveMetadata::from_metadata23(metadata)),
Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");
Err(err) if err.is_http_range_requests_unsupported() => {
warn!("Range requests unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");

// If the request failed due to an error that could be resolved by
// downloading the wheel directly, try that.
// If the request failed due to an error that could be resolved by downloading the
// wheel directly, try that.
let wheel = self.get_wheel(dist, hashes).await?;
let metadata = wheel.metadata()?;
let hashes = wheel.hashes;
Expand Down
1 change: 0 additions & 1 deletion crates/uv/tests/pip_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3227,7 +3227,6 @@ fn no_stream() -> Result<()> {

----- stderr -----
Resolved 1 package in [TIME]
Prepared 1 package in [TIME]
Installed 1 package in [TIME]
+ hashb-foxglove-protocolbuffers-python==25.3.0.1.20240226043130+465630478360
"###
Expand Down
Loading