From 79ac3a2a7e1d966ebee0083f97a33e854285c414 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 12 Mar 2024 07:13:57 -0700 Subject: [PATCH] Wait for request stream to flush before returning resolution (#2374) ## Summary This is a more robust fix for https://github.com/astral-sh/uv/issues/2300. The basic issue is: - When we resolve, we attempt to pre-fetch the distribution metadata for candidate packages. - It's possible that the resolution completes _without_ those pre-fetch responses. (In the linked issue, this was mainly because we were running with `--no-deps`, but the pre-fetch was causing us to attempt to build a package to get its dependencies. The resolution would then finish before the build completed.) - In that case, the `Index` will be marked as "waiting" for that response -- but it'll never come through. - If there's a subsequent call to the `Index`, to see if we should fetch or are waiting for that response, we'll end up waiting for it forever, since it _looks_ like it's in-flight (but isn't). (In the linked issue, we had to build the source distribution for the install phase of `pip install`, but `setuptools` was in this bad state from the _resolve_ phase.) This PR modifies the resolver to ensure that we flush the stream of requests before returning. Specifically, we now `join` rather than `select` between the resolution and request-handling futures. This _could_ be wasteful, since we don't _need_ those requests, but it at least ensures that every `.wait` is followed by ` .done`. In practice, I expect this not to have any significant effect on performance, since we end up using the pre-fetched distributions almost every time. ## Test Plan I ran through the test plan from https://github.com/astral-sh/uv/pull/2373, but ran the build 10 times and ensured it never crashed. (I reverted https://github.com/astral-sh/uv/pull/2373, since that _also_ fixes the issue in the proximate case, by never fetching `setuptools` during the resolve phase.) I also added logging to verify that requests are being handled _after_ the resolution completes, as expected. I also introduced an arbitrary error in `fetch` to ensure that the error was immediately propagated. --- crates/uv-resolver/src/error.rs | 2 +- crates/uv-resolver/src/resolver/mod.rs | 57 ++++++++++++-------------- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/crates/uv-resolver/src/error.rs b/crates/uv-resolver/src/error.rs index 9f3857dbb28e..be9d3e59e362 100644 --- a/crates/uv-resolver/src/error.rs +++ b/crates/uv-resolver/src/error.rs @@ -27,7 +27,7 @@ pub enum ResolveError { #[error(transparent)] Client(#[from] uv_client::Error), - #[error("The channel is closed, was there a panic?")] + #[error("The channel closed unexpectedly")] ChannelClosed, #[error(transparent)] diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 4c8f9214a09b..a0b1c58e4602 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -12,7 +12,6 @@ use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio::select; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info_span, instrument, trace, Instrument}; use url::Url; @@ -197,42 +196,40 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let requests_fut = self.fetch(request_stream).fuse(); // Run the solver. - let resolve_fut = self.solve(&request_sink).fuse(); + let resolve_fut = self.solve(request_sink).fuse(); - let resolution = select! { - result = requests_fut => { - result?; - return Err(ResolveError::ChannelClosed); + // Wait for both to complete. + match tokio::try_join!(requests_fut, resolve_fut) { + Ok(((), resolution)) => { + self.on_complete(); + Ok(resolution) } - resolution = resolve_fut => { - resolution.map_err(|err| { - // Add version information to improve unsat error messages. - if let ResolveError::NoSolution(err) = err { - ResolveError::NoSolution( - err - .with_available_versions(&self.python_requirement, &self.visited, &self.index.packages) - .with_selector(self.selector.clone()) - .with_python_requirement(&self.python_requirement) - .with_index_locations(self.provider.index_locations()) - .with_unavailable_packages(&self.unavailable_packages) + Err(err) => { + // Add version information to improve unsat error messages. + Err(if let ResolveError::NoSolution(err) = err { + ResolveError::NoSolution( + err.with_available_versions( + &self.python_requirement, + &self.visited, + &self.index.packages, ) - } else { - err - } - })? + .with_selector(self.selector.clone()) + .with_python_requirement(&self.python_requirement) + .with_index_locations(self.provider.index_locations()) + .with_unavailable_packages(&self.unavailable_packages), + ) + } else { + err + }) } - }; - - self.on_complete(); - - Ok(resolution) + } } /// Run the `PubGrub` solver. #[instrument(skip_all)] async fn solve( &self, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: tokio::sync::mpsc::Sender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); @@ -258,7 +255,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // Pre-visit all candidate packages, to allow metadata to be fetched in parallel. If // the dependency mode is direct, we only need to visit the root package. if self.dependency_mode.is_transitive() { - Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink) + Self::pre_visit(state.partial_solution.prioritized_packages(), &request_sink) .await?; } @@ -294,7 +291,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { &next, term_intersection.unwrap_positive(), &mut pins, - request_sink, + &request_sink, ) .await?; @@ -434,7 +431,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // Retrieve that package dependencies. let package = &next; let dependencies = match self - .get_dependencies(package, &version, &mut priorities, request_sink) + .get_dependencies(package, &version, &mut priorities, &request_sink) .await? { Dependencies::Unavailable(reason) => {