Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information to wait-for-publish #11713

Merged
merged 7 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/cargo-test-support/src/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ fn substitute_macros(input: &str) -> String {
("[DOWNLOADING]", " Downloading"),
("[DOWNLOADED]", " Downloaded"),
("[UPLOADING]", " Uploading"),
("[UPLOADED]", " Uploaded"),
("[VERIFYING]", " Verifying"),
("[ARCHIVING]", " Archiving"),
("[INSTALLING]", " Installing"),
Expand All @@ -232,6 +233,7 @@ fn substitute_macros(input: &str) -> String {
("[EXECUTABLE]", " Executable"),
("[SKIPPING]", " Skipping"),
("[WAITING]", " Waiting"),
("[PUBLISHED]", " Published"),
];
let mut result = input.to_owned();
for &(pat, subst) in &macros {
Expand Down
2 changes: 1 addition & 1 deletion crates/cargo-test-support/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub(crate) fn create_index_line(
json.to_string()
}

pub(crate) fn write_to_index(registry_path: &PathBuf, name: &str, line: String, local: bool) {
pub(crate) fn write_to_index(registry_path: &Path, name: &str, line: String, local: bool) {
let file = cargo_util::registry::make_dep_path(name, false);

// Write file/line in the index.
Expand Down
111 changes: 73 additions & 38 deletions crates/cargo-test-support/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fmt;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::thread::{self, JoinHandle};
use tar::{Builder, Header};
use time::format_description::well_known::Rfc3339;
Expand Down Expand Up @@ -98,6 +98,8 @@ pub struct RegistryBuilder {
configure_registry: bool,
/// API responders.
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
/// If nonzero, the git index update to be delayed by the given number of seconds.
delayed_index_update: usize,
}

pub struct TestRegistry {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl RegistryBuilder {
configure_registry: true,
configure_token: true,
custom_responders: HashMap::new(),
delayed_index_update: 0,
}
}

Expand All @@ -171,6 +174,13 @@ impl RegistryBuilder {
self
}

/// Configures the git index update to be delayed by the given number of seconds.
#[must_use]
pub fn delayed_index_update(mut self, delay: usize) -> Self {
self.delayed_index_update = delay;
self
}

/// Sets whether or not to initialize as an alternative registry.
#[must_use]
pub fn alternative_named(mut self, alt: &str) -> Self {
Expand Down Expand Up @@ -265,6 +275,7 @@ impl RegistryBuilder {
token.clone(),
self.auth_required,
self.custom_responders,
self.delayed_index_update,
);
let index_url = if self.http_index {
server.index_url()
Expand Down Expand Up @@ -591,6 +602,7 @@ pub struct HttpServer {
token: Token,
auth_required: bool,
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
delayed_index_update: usize,
}

/// A helper struct that collects the arguments for [HttpServer::check_authorized].
Expand All @@ -613,6 +625,7 @@ impl HttpServer {
&'static str,
Box<dyn Send + Fn(&Request, &HttpServer) -> Response>,
>,
delayed_index_update: usize,
) -> HttpServerHandle {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
Expand All @@ -625,6 +638,7 @@ impl HttpServer {
token,
auth_required,
custom_responders: api_responders,
delayed_index_update,
};
let handle = Some(thread::spawn(move || server.start()));
HttpServerHandle { addr, handle }
Expand Down Expand Up @@ -1040,49 +1054,23 @@ impl HttpServer {
return self.unauthorized(req);
}

// Write the `.crate`
let dst = self
.dl_path
.join(&new_crate.name)
.join(&new_crate.vers)
.join("download");
t!(fs::create_dir_all(dst.parent().unwrap()));
t!(fs::write(&dst, file));

let deps = new_crate
.deps
.iter()
.map(|dep| {
let (name, package) = match &dep.explicit_name_in_toml {
Some(explicit) => (explicit.to_string(), Some(dep.name.to_string())),
None => (dep.name.to_string(), None),
};
serde_json::json!({
"name": name,
"req": dep.version_req,
"features": dep.features,
"default_features": true,
"target": dep.target,
"optional": dep.optional,
"kind": dep.kind,
"registry": dep.registry,
"package": package,
})
})
.collect::<Vec<_>>();

let line = create_index_line(
serde_json::json!(new_crate.name),
&new_crate.vers,
deps,
&file_cksum,
new_crate.features,
false,
new_crate.links,
None,
);

write_to_index(&self.registry_path, &new_crate.name, line, false);
if self.delayed_index_update == 0 {
save_new_crate(dst, new_crate, file, file_cksum, &self.registry_path);
} else {
let delayed_index_update = self.delayed_index_update;
let registry_path = self.registry_path.clone();
let file = Vec::from(file);
thread::spawn(move || {
thread::sleep(std::time::Duration::new(delayed_index_update as u64, 0));
save_new_crate(dst, new_crate, &file, file_cksum, &registry_path);
});
}

self.ok(&req)
} else {
Expand All @@ -1095,6 +1083,53 @@ impl HttpServer {
}
}

fn save_new_crate(
dst: PathBuf,
new_crate: crates_io::NewCrate,
file: &[u8],
file_cksum: String,
registry_path: &Path,
) {
// Write the `.crate`
t!(fs::create_dir_all(dst.parent().unwrap()));
t!(fs::write(&dst, file));

let deps = new_crate
.deps
.iter()
.map(|dep| {
let (name, package) = match &dep.explicit_name_in_toml {
Some(explicit) => (explicit.to_string(), Some(dep.name.to_string())),
None => (dep.name.to_string(), None),
};
serde_json::json!({
"name": name,
"req": dep.version_req,
"features": dep.features,
"default_features": true,
"target": dep.target,
"optional": dep.optional,
"kind": dep.kind,
"registry": dep.registry,
"package": package,
})
})
.collect::<Vec<_>>();

let line = create_index_line(
serde_json::json!(new_crate.name),
&new_crate.vers,
deps,
&file_cksum,
new_crate.features,
false,
new_crate.links,
None,
);

write_to_index(registry_path, &new_crate.name, line, false);
}

impl Package {
/// Creates a new package builder.
/// Call `publish()` to finalize and build the package.
Expand Down
11 changes: 11 additions & 0 deletions src/cargo/core/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub trait Source {
/// Ensure that the source is fully up-to-date for the current session on the next query.
fn invalidate_cache(&mut self);

/// If quiet, the source should not display any progress or status messages.
fn set_quiet(&mut self, quiet: bool);
epage marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, don't we cache sources generally (and the "wait" logic has to workaround that)? I feel like mutating a cached item like this is pretty brittle (in an already brittle system) and I'd hope we can find a way to avoid making it worse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. Sources are inherently mutable. I'm not sure I would say they are "cached" so much as they are lazily created as needed and held in the SourceMap. But by their nature they are accessed from the map and mutated as needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the caching / lazy loading, I don't remember the details but I do remember running into a lot of problems implementing this because I was getting stale state due to caches that said "everything is loaded, no reason to talk to the server again".

But by their nature they are accessed from the map and mutated as needed.

From my quick glance, they are mutated as-needed in that you invalidate their cache and they reload. I didn't see any other real "state" to them that could leak from one operation to another. That is what I'm concerned about. As I said, I ran into a lot of problems with cargo assuming everything was only done once because commands are transient but when a command needed to do things multiple times, they broke. While I am not refreshed on all the details to map out whether its actually safe or not, this feels like one of those things that could easily cause people problems in the future.


/// Fetches the full package for each name and version specified.
fn download(&mut self, package: PackageId) -> CargoResult<MaybePackage>;

Expand Down Expand Up @@ -163,6 +166,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box<T> {
(**self).invalidate_cache()
}

fn set_quiet(&mut self, quiet: bool) {
(**self).set_quiet(quiet)
}

/// Forwards to `Source::download`.
fn download(&mut self, id: PackageId) -> CargoResult<MaybePackage> {
(**self).download(id)
Expand Down Expand Up @@ -233,6 +240,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T {
(**self).invalidate_cache()
}

fn set_quiet(&mut self, quiet: bool) {
(**self).set_quiet(quiet)
}

fn download(&mut self, id: PackageId) -> CargoResult<MaybePackage> {
(**self).download(id)
}
Expand Down
56 changes: 36 additions & 20 deletions src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::util::config::{Config, SslVersionConfig, SslVersionConfigRange};
use crate::util::errors::CargoResult;
use crate::util::important_paths::find_root_manifest_for_wd;
use crate::util::{truncate_with_ellipsis, IntoUrl};
use crate::util::{Progress, ProgressStyle};
use crate::{drop_print, drop_println, version};

/// Registry settings loaded from config files.
Expand Down Expand Up @@ -442,13 +443,29 @@ fn wait_for_publish(
) -> CargoResult<()> {
let version_req = format!("={}", pkg.version());
let mut source = SourceConfigMap::empty(config)?.load(registry_src, &HashSet::new())?;
let source_description = source.describe();
// Disable the source's built-in progress bars. Repeatedly showing a bunch
// of independent progress bars can be a little confusing. There is an
// overall progress bar managed here.
source.set_quiet(true);
let source_description = source.source_id().to_string();
epage marked this conversation as resolved.
Show resolved Hide resolved
let query = Dependency::parse(pkg.name(), Some(&version_req), registry_src)?;

let now = std::time::Instant::now();
let sleep_time = std::time::Duration::from_secs(1);
let mut logged = false;
loop {
let max = timeout.as_secs() as usize;
// Short does not include the registry name.
let short_pkg_description = format!("{} v{}", pkg.name(), pkg.version());
config.shell().status(
"Uploaded",
format!("{short_pkg_description} to {source_description}"),
)?;
config.shell().note(format!(
"Waiting for `{short_pkg_description}` to be available at {source_description}.\n\
You may press ctrl-c to skip waiting; the crate should be available shortly."
))?;
let mut progress = Progress::with_style("Waiting", ProgressStyle::Ratio, config);
epage marked this conversation as resolved.
Show resolved Hide resolved
progress.tick_now(0, max, "")?;
let is_available = loop {
{
let _lock = config.acquire_package_cache_lock()?;
// Force re-fetching the source
Expand All @@ -470,31 +487,30 @@ fn wait_for_publish(
}
};
if !summaries.is_empty() {
break;
break true;
}
}

if timeout < now.elapsed() {
let elapsed = now.elapsed();
if timeout < elapsed {
config.shell().warn(format!(
"timed out waiting for `{}` to be in {}",
pkg.name(),
source_description
"timed out waiting for `{short_pkg_description}` to be available in {source_description}",
))?;
break;
}

if !logged {
config.shell().status(
"Waiting",
format!(
"on `{}` to propagate to {} (ctrl-c to wait asynchronously)",
pkg.name(),
source_description
),
config.shell().note(
"The registry may have a backlog that is delaying making the \
crate available. The crate should be available soon.",
)?;
logged = true;
break false;
}

progress.tick_now(elapsed.as_secs() as usize, max, "")?;
std::thread::sleep(sleep_time);
};
if is_available {
config.shell().status(
"Published",
format!("{short_pkg_description} at {source_description}"),
)?;
}

Ok(())
Expand Down
6 changes: 5 additions & 1 deletion src/cargo/sources/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ impl<'cfg> Source for DirectorySource<'cfg> {
}

fn invalidate_cache(&mut self) {
// Path source has no local cache.
// Directory source has no local cache.
}

fn set_quiet(&mut self, _quiet: bool) {
// Directory source does not display status
}
}
16 changes: 12 additions & 4 deletions src/cargo/sources/git/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct GitSource<'cfg> {
path_source: Option<PathSource<'cfg>>,
ident: String,
config: &'cfg Config,
quiet: bool,
}

impl<'cfg> GitSource<'cfg> {
Expand All @@ -43,6 +44,7 @@ impl<'cfg> GitSource<'cfg> {
path_source: None,
ident,
config,
quiet: false,
};

Ok(source)
Expand Down Expand Up @@ -162,10 +164,12 @@ impl<'cfg> Source for GitSource<'cfg> {
self.remote.url()
);
}
self.config.shell().status(
"Updating",
format!("git repository `{}`", self.remote.url()),
)?;
if !self.quiet {
self.config.shell().status(
"Updating",
format!("git repository `{}`", self.remote.url()),
)?;
}

trace!("updating git source `{:?}`", self.remote);

Expand Down Expand Up @@ -233,6 +237,10 @@ impl<'cfg> Source for GitSource<'cfg> {
}

fn invalidate_cache(&mut self) {}

fn set_quiet(&mut self, quiet: bool) {
self.quiet = quiet;
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/cargo/sources/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,8 @@ impl<'cfg> Source for PathSource<'cfg> {
fn invalidate_cache(&mut self) {
// Path source has no local cache.
}

fn set_quiet(&mut self, _quiet: bool) {
// Path source does not display status
}
}
Loading