diff --git a/Cargo.lock b/Cargo.lock index 71edd66e8d69..5fd21321deb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4795,6 +4795,7 @@ dependencies = [ "indoc", "insta", "nanoid", + "owo-colors", "pep440_rs", "pep508_rs", "platform-tags", diff --git a/crates/uv-cache/src/lib.rs b/crates/uv-cache/src/lib.rs index 0f5c34eaa34f..bfcc822c403d 100644 --- a/crates/uv-cache/src/lib.rs +++ b/crates/uv-cache/src/lib.rs @@ -45,6 +45,11 @@ impl CacheEntry { Self(path.into()) } + /// Return the cache entry's parent directory. + pub fn shard(&self) -> CacheShard { + CacheShard(self.dir().to_path_buf()) + } + /// Convert the [`CacheEntry`] into a [`PathBuf`]. #[inline] pub fn into_path_buf(self) -> PathBuf { diff --git a/crates/uv-distribution/Cargo.toml b/crates/uv-distribution/Cargo.toml index eee780ec11f1..d44386b2a220 100644 --- a/crates/uv-distribution/Cargo.toml +++ b/crates/uv-distribution/Cargo.toml @@ -37,6 +37,7 @@ anyhow = { workspace = true } fs-err = { workspace = true } futures = { workspace = true } nanoid = { workspace = true } +owo-colors = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } rmp-serde = { workspace = true } diff --git a/crates/uv-distribution/src/error.rs b/crates/uv-distribution/src/error.rs index 3a0c12b7f688..36fa853339c0 100644 --- a/crates/uv-distribution/src/error.rs +++ b/crates/uv-distribution/src/error.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use owo_colors::OwoColorize; use tokio::task::JoinError; use url::Url; use zip::result::ZipError; @@ -93,6 +94,8 @@ pub enum Error { MetadataLowering(#[from] MetadataError), #[error("Distribution not found at: {0}")] NotFound(Url), + #[error("Attempted to re-extract the source distribution for `{0}`, but the hashes didn't match. Run `{}` to clear the cache.", "uv cache clean".green())] + CacheHeal(String), /// A generic request middleware error happened while making a request. /// Refer to the error message for more details. diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 9bcf1a7d26a8..c4c40a01a9fb 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -23,7 +23,7 @@ use platform_tags::Tags; use pypi_types::{HashDigest, Metadata12, Metadata23, RequiresTxt}; use reqwest::Response; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::{debug, info_span, instrument, Instrument}; +use tracing::{debug, info_span, instrument, warn, Instrument}; use url::Url; use uv_cache::{Cache, CacheBucket, CacheEntry, CacheShard, Removal, WheelCache}; use uv_cache_info::CacheInfo; @@ -425,6 +425,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); + let source_dist_entry = cache_shard.entry(filename); // If there are build settings, we need to scope to a cache shard. let config_settings = self.build_context.config_settings(); @@ -439,13 +440,29 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { return Ok(built_wheel.with_hashes(revision.into_hashes())); } + // Otherwise, we need to build a wheel. Before building, ensure that the source is present. + let revision = if source_dist_entry.path().is_dir() { + revision + } else { + self.heal_url_revision( + source, + filename, + ext, + url, + &source_dist_entry, + revision, + hashes, + client, + ) + .await? + }; + let task = self .reporter .as_ref() .map(|reporter| reporter.on_build_start(source)); // Build the source distribution. - let source_dist_entry = cache_shard.entry(filename); let (disk_filename, wheel_filename, metadata) = self .build_distribution(source, source_dist_entry.path(), subdirectory, &cache_shard) .await?; @@ -527,6 +544,23 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }); } + // Otherwise, we need a wheel. + let revision = if source_dist_entry.path().is_dir() { + revision + } else { + self.heal_url_revision( + source, + filename, + ext, + url, + &source_dist_entry, + revision, + hashes, + client, + ) + .await? + }; + // If there are build settings, we need to scope to a cache shard. let config_settings = self.build_context.config_settings(); let cache_shard = if config_settings.is_empty() { @@ -686,6 +720,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); + let source_entry = cache_shard.entry("source"); // If there are build settings, we need to scope to a cache shard. let config_settings = self.build_context.config_settings(); @@ -700,9 +735,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { return Ok(built_wheel); } - let source_entry = cache_shard.entry("source"); + // Otherwise, we need to build a wheel, which requires a source distribution. + let revision = if source_entry.path().is_dir() { + revision + } else { + self.heal_archive_revision(source, resource, &source_entry, revision, hashes) + .await? + }; - // Otherwise, we need to build a wheel. let task = self .reporter .as_ref() @@ -785,6 +825,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }); } + // Otherwise, we need a source distribution. + let revision = if source_entry.path().is_dir() { + revision + } else { + self.heal_archive_revision(source, resource, &source_entry, revision, hashes) + .await? + }; + // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_metadata(source, source_entry.path(), None) @@ -1346,6 +1394,66 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { )) } + /// Heal a [`Revision`] for a local archive. + async fn heal_archive_revision( + &self, + source: &BuildableSource<'_>, + resource: &PathSourceUrl<'_>, + entry: &CacheEntry, + revision: Revision, + hashes: HashPolicy<'_>, + ) -> Result { + warn!("Re-extracting missing source distribution: {source}"); + let hashes = self + .persist_archive(&resource.path, resource.ext, entry.path(), hashes) + .await?; + if hashes != revision.hashes() { + return Err(Error::CacheHeal(source.to_string())); + } + Ok(revision.with_hashes(hashes)) + } + + /// Heal a [`Revision`] for a remote archive. + async fn heal_url_revision( + &self, + source: &BuildableSource<'_>, + filename: &str, + ext: SourceDistExtension, + url: &Url, + entry: &CacheEntry, + revision: Revision, + hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, + ) -> Result { + warn!("Re-downloading missing source distribution: {source}"); + let cache_entry = entry.shard().entry(HTTP_REVISION); + let download = |response| { + async { + let hashes = self + .download_archive(response, source, filename, ext, entry.path(), hashes) + .await?; + if hashes != revision.hashes() { + return Err(Error::CacheHeal(source.to_string())); + } + Ok(revision.with_hashes(hashes)) + } + .boxed_local() + .instrument(info_span!("download", source_dist = %source)) + }; + client + .managed(|client| async move { + client + .cached_client() + .skip_cache(Self::request(url.clone(), client)?, &cache_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => Error::Client(err), + }) + }) + .await + } + /// Download and unzip a source distribution into the cache from an HTTP response. async fn download_archive( &self, @@ -1395,9 +1503,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { fs_err::tokio::create_dir_all(target.parent().expect("Cache entry to have parent")) .await .map_err(Error::CacheWrite)?; - rename_with_retry(extracted, target) - .await - .map_err(Error::CacheWrite)?; + if let Err(err) = rename_with_retry(extracted, target).await { + // If the directory already exists, accept it. + if target.is_dir() { + warn!("Directory already exists: {}", target.display()); + } else { + return Err(Error::CacheWrite(err)); + } + } Ok(hashes) } @@ -1448,9 +1561,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { fs_err::tokio::create_dir_all(target.parent().expect("Cache entry to have parent")) .await .map_err(Error::CacheWrite)?; - rename_with_retry(extracted, &target) - .await - .map_err(Error::CacheWrite)?; + if let Err(err) = rename_with_retry(extracted, target).await { + // If the directory already exists, accept it. + if target.is_dir() { + warn!("Directory already exists: {}", target.display()); + } else { + return Err(Error::CacheWrite(err)); + } + } Ok(hashes) }