diff --git a/crates/uv-client/src/registry_client.rs b/crates/uv-client/src/registry_client.rs index d0b51ec0d88e..b547022cc606 100644 --- a/crates/uv-client/src/registry_client.rs +++ b/crates/uv-client/src/registry_client.rs @@ -4,18 +4,17 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use async_http_range_reader::AsyncHttpRangeReader; -use futures::{FutureExt, TryStreamExt}; +use futures::FutureExt; use http::HeaderMap; use reqwest::{Client, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; -use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; +use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{info_span, instrument, trace, warn, Instrument}; use url::Url; use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename}; use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name}; -use install_wheel_rs::metadata::{find_archive_dist_info, is_metadata_entry}; +use install_wheel_rs::metadata::find_archive_dist_info; use pep440_rs::Version; use pep508_rs::MarkerEnvironment; use platform_tags::Platform; @@ -598,8 +597,7 @@ impl RegistryClient { .instrument(info_span!("read_metadata_range_request", wheel = %filename)) }; - let result = self - .cached_client() + self.cached_client() .get_serde( req, &cache_entry, @@ -607,67 +605,8 @@ impl RegistryClient { read_metadata_range_request, ) .await - .map_err(crate::Error::from); - - match result { - Ok(metadata) => return Ok(metadata), - Err(err) => { - if err.is_http_range_requests_unsupported() { - // The range request version failed. Fall back to streaming the file to search - // for the METADATA file. - warn!("Range requests not supported for {filename}; streaming wheel"); - } else { - return Err(err); - } - } - }; - - // Create a request to stream the file. - let req = self - .uncached_client() - .get(url.clone()) - .header( - // `reqwest` defaults to accepting compressed responses. - // Specify identity encoding to get consistent .whl downloading - // behavior from servers. ref: https://github.com/pypa/pip/pull/1688 - "accept-encoding", - reqwest::header::HeaderValue::from_static("identity"), - ) - .build() - .map_err(ErrorKind::from)?; - - // Stream the file, searching for the METADATA. - let read_metadata_stream = |response: Response| { - async { - let reader = response - .bytes_stream() - .map_err(|err| self.handle_response_errors(err)) - .into_async_read(); - - read_metadata_async_stream(filename, url.to_string(), reader).await - } - .instrument(info_span!("read_metadata_stream", wheel = %filename)) - }; - - self.cached_client() - .get_serde(req, &cache_entry, cache_control, read_metadata_stream) - .await .map_err(crate::Error::from) } - - /// Handle a specific `reqwest` error, and convert it to [`io::Error`]. - fn handle_response_errors(&self, err: reqwest::Error) -> std::io::Error { - if err.is_timeout() { - std::io::Error::new( - std::io::ErrorKind::TimedOut, - format!( - "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.timeout() - ), - ) - } else { - std::io::Error::new(std::io::ErrorKind::Other, err) - } - } } /// Read a wheel's `METADATA` file from a zip file. @@ -708,50 +647,6 @@ async fn read_metadata_async_seek( Ok(metadata) } -/// Like [`read_metadata_async_seek`], but doesn't use seek. -async fn read_metadata_async_stream( - filename: &WheelFilename, - debug_source: String, - reader: R, -) -> Result { - let reader = futures::io::BufReader::with_capacity(128 * 1024, reader); - let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader); - - while let Some(mut entry) = zip - .next_with_entry() - .await - .map_err(|err| ErrorKind::Zip(filename.clone(), err))? - { - // Find the `METADATA` entry. - let path = entry - .reader() - .entry() - .filename() - .as_str() - .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; - - if is_metadata_entry(path, filename) { - let mut reader = entry.reader_mut().compat(); - let mut contents = Vec::new(); - reader.read_to_end(&mut contents).await.unwrap(); - - let metadata = Metadata23::parse_metadata(&contents).map_err(|err| { - ErrorKind::MetadataParseError(filename.clone(), debug_source, Box::new(err)) - })?; - return Ok(metadata); - } - - // Close current file to get access to the next one. See docs: - // https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/ - zip = entry - .skip() - .await - .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; - } - - Err(ErrorKind::MetadataNotFound(filename.clone(), debug_source).into()) -} - #[derive( Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, )] diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index ac8a81ebbe89..47f67773d927 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -208,6 +208,31 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { hashes: archive.hashes, filename: wheel.filename.clone(), }), + Err(Error::Client(err)) if err.is_http_streaming_unsupported() => { + warn!( + "Streaming unsupported for {dist}; downloading wheel to disk ({err})" + ); + + // If the request failed because streaming is unsupported, download the + // wheel directly. + let archive = self + .download_wheel( + url, + &wheel.filename, + wheel.file.size, + &wheel_entry, + dist, + hashes, + ) + .await?; + + Ok(LocalWheel { + dist: Dist::Built(dist.clone()), + archive: self.build_context.cache().archive(&archive.id), + hashes: archive.hashes, + filename: wheel.filename.clone(), + }) + } Err(Error::Extract(err)) => { if err.is_http_streaming_unsupported() { warn!( @@ -293,6 +318,36 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { filename: wheel.filename.clone(), }) } + Err(Error::Extract(err)) => { + if err.is_http_streaming_unsupported() { + warn!( + "Streaming unsupported for {dist}; downloading wheel to disk ({err})" + ); + } else if err.is_http_streaming_failed() { + warn!("Streaming failed for {dist}; downloading wheel to disk ({err})"); + } else { + return Err(Error::Extract(err)); + } + + // If the request failed because streaming is unsupported, download the + // wheel directly. + let archive = self + .download_wheel( + wheel.url.raw().clone(), + &wheel.filename, + None, + &wheel_entry, + dist, + hashes, + ) + .await?; + Ok(LocalWheel { + dist: Dist::Built(dist.clone()), + archive: self.build_context.cache().archive(&archive.id), + hashes: archive.hashes, + filename: wheel.filename.clone(), + }) + } Err(err) => Err(err), } } @@ -390,11 +445,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { match result { Ok(metadata) => Ok(ArchiveMetadata::from_metadata23(metadata)), - Err(err) if err.is_http_streaming_unsupported() => { - warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); + Err(err) if err.is_http_range_requests_unsupported() => { + warn!("Range requests unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); - // If the request failed due to an error that could be resolved by - // downloading the wheel directly, try that. + // If the request failed due to an error that could be resolved by downloading the + // wheel directly, try that. let wheel = self.get_wheel(dist, hashes).await?; let metadata = wheel.metadata()?; let hashes = wheel.hashes; diff --git a/crates/uv/tests/pip_sync.rs b/crates/uv/tests/pip_sync.rs index b080fcbf0cc2..d164d0a670a7 100644 --- a/crates/uv/tests/pip_sync.rs +++ b/crates/uv/tests/pip_sync.rs @@ -3227,7 +3227,6 @@ fn no_stream() -> Result<()> { ----- stderr ----- Resolved 1 package in [TIME] - Prepared 1 package in [TIME] Installed 1 package in [TIME] + hashb-foxglove-protocolbuffers-python==25.3.0.1.20240226043130+465630478360 "###