Skip to content

Commit

Permalink
Wait for request stream to flush before returning resolution (#2374)
Browse files Browse the repository at this point in the history
## Summary

This is a more robust fix for
#2300.

The basic issue is:

- When we resolve, we attempt to pre-fetch the distribution metadata for
candidate packages.
- It's possible that the resolution completes _without_ those pre-fetch
responses. (In the linked issue, this was mainly because we were running
with `--no-deps`, but the pre-fetch was causing us to attempt to build a
package to get its dependencies. The resolution would then finish before
the build completed.)
- In that case, the `Index` will be marked as "waiting" for that
response -- but it'll never come through.
- If there's a subsequent call to the `Index`, to see if we should fetch
or are waiting for that response, we'll end up waiting for it forever,
since it _looks_ like it's in-flight (but isn't). (In the linked issue,
we had to build the source distribution for the install phase of `pip
install`, but `setuptools` was in this bad state from the _resolve_
phase.)

This PR modifies the resolver to ensure that we flush the stream of
requests before returning. Specifically, we now `join` rather than
`select` between the resolution and request-handling futures.

This _could_ be wasteful, since we don't _need_ those requests, but it
at least ensures that every `.wait` is followed by ` .done`. In
practice, I expect this not to have any significant effect on
performance, since we end up using the pre-fetched distributions almost
every time.

## Test Plan

I ran through the test plan from
#2373, but ran the build 10 times
and ensured it never crashed. (I reverted
#2373, since that _also_ fixes the
issue in the proximate case, by never fetching `setuptools` during the
resolve phase.)

I also added logging to verify that requests are being handled _after_
the resolution completes, as expected.

I also introduced an arbitrary error in `fetch` to ensure that the error
was immediately propagated.
  • Loading branch information
charliermarsh committed Mar 12, 2024
1 parent 96290bf commit 79ac3a2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/uv-resolver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum ResolveError {
#[error(transparent)]
Client(#[from] uv_client::Error),

#[error("The channel is closed, was there a panic?")]
#[error("The channel closed unexpectedly")]
ChannelClosed,

#[error(transparent)]
Expand Down
57 changes: 27 additions & 30 deletions crates/uv-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use pubgrub::error::PubGrubError;
use pubgrub::range::Range;
use pubgrub::solver::{Incompatibility, State};
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::select;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info_span, instrument, trace, Instrument};
use url::Url;
Expand Down Expand Up @@ -197,42 +196,40 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let requests_fut = self.fetch(request_stream).fuse();

// Run the solver.
let resolve_fut = self.solve(&request_sink).fuse();
let resolve_fut = self.solve(request_sink).fuse();

let resolution = select! {
result = requests_fut => {
result?;
return Err(ResolveError::ChannelClosed);
// Wait for both to complete.
match tokio::try_join!(requests_fut, resolve_fut) {
Ok(((), resolution)) => {
self.on_complete();
Ok(resolution)
}
resolution = resolve_fut => {
resolution.map_err(|err| {
// Add version information to improve unsat error messages.
if let ResolveError::NoSolution(err) = err {
ResolveError::NoSolution(
err
.with_available_versions(&self.python_requirement, &self.visited, &self.index.packages)
.with_selector(self.selector.clone())
.with_python_requirement(&self.python_requirement)
.with_index_locations(self.provider.index_locations())
.with_unavailable_packages(&self.unavailable_packages)
Err(err) => {
// Add version information to improve unsat error messages.
Err(if let ResolveError::NoSolution(err) = err {
ResolveError::NoSolution(
err.with_available_versions(
&self.python_requirement,
&self.visited,
&self.index.packages,
)
} else {
err
}
})?
.with_selector(self.selector.clone())
.with_python_requirement(&self.python_requirement)
.with_index_locations(self.provider.index_locations())
.with_unavailable_packages(&self.unavailable_packages),
)
} else {
err
})
}
};

self.on_complete();

Ok(resolution)
}
}

/// Run the `PubGrub` solver.
#[instrument(skip_all)]
async fn solve(
&self,
request_sink: &tokio::sync::mpsc::Sender<Request>,
request_sink: tokio::sync::mpsc::Sender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());

Expand All @@ -258,7 +255,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
// Pre-visit all candidate packages, to allow metadata to be fetched in parallel. If
// the dependency mode is direct, we only need to visit the root package.
if self.dependency_mode.is_transitive() {
Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink)
Self::pre_visit(state.partial_solution.prioritized_packages(), &request_sink)
.await?;
}

Expand Down Expand Up @@ -294,7 +291,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
&next,
term_intersection.unwrap_positive(),
&mut pins,
request_sink,
&request_sink,
)
.await?;

Expand Down Expand Up @@ -434,7 +431,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
// Retrieve that package dependencies.
let package = &next;
let dependencies = match self
.get_dependencies(package, &version, &mut priorities, request_sink)
.get_dependencies(package, &version, &mut priorities, &request_sink)
.await?
{
Dependencies::Unavailable(reason) => {
Expand Down

0 comments on commit 79ac3a2

Please sign in to comment.