diff --git a/Cargo.toml b/Cargo.toml index 2a26ef6441b..fa995bef17e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ atty = "0.2" bytesize = "1.0" cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" } crates-io = { path = "crates/crates-io", version = "0.31" } -crossbeam-channel = "0.4" crossbeam-utils = "0.7" crypto-hash = "0.3.1" curl = { version = "0.4.23", features = ["http2"] } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 60ecaa31750..039d93d50d7 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -58,7 +58,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::format_err; -use crossbeam_channel::{unbounded, Receiver, Sender}; use crossbeam_utils::thread::Scope; use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; @@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit}; use crate::core::{PackageId, TargetKind}; use crate::util; use crate::util::diagnostic_server::{self, DiagnosticPrinter}; +use crate::util::Queue; use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder}; use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; @@ -93,13 +93,34 @@ pub struct JobQueue<'a, 'cfg> { /// /// It is created from JobQueue when we have fully assembled the crate graph /// (i.e., all package dependencies are known). +/// +/// # Message queue +/// +/// Each thread running a process uses the message queue to send messages back +/// to the main thread. The main thread coordinates everything, and handles +/// printing output. +/// +/// It is important to be careful which messages use `push` vs `push_bounded`. +/// `push` is for priority messages (like tokens, or "finished") where the +/// sender shouldn't block. We want to handle those so real work can proceed +/// ASAP. +/// +/// `push_bounded` is only for messages being printed to stdout/stderr. Being +/// bounded prevents a flood of messages causing a large amount of memory +/// being used. +/// +/// `push` also avoids blocking which helps avoid deadlocks. For example, when +/// the diagnostic server thread is dropped, it waits for the thread to exit. +/// But if the thread is blocked on a full queue, and there is a critical +/// error, the drop will deadlock. This should be fixed at some point in the +/// future. The jobserver thread has a similar problem, though it will time +/// out after 1 second. struct DrainState<'a, 'cfg> { // This is the length of the DependencyQueue when starting out total_units: usize, queue: DependencyQueue, Artifact, Job>, - tx: Sender, - rx: Receiver, + messages: Arc>, active: HashMap>, compiled: HashSet, documented: HashSet, @@ -145,7 +166,7 @@ impl std::fmt::Display for JobId { pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. - tx: Sender, + messages: Arc>, /// The job id that this state is associated with, used when sending /// messages back to the main thread. @@ -199,7 +220,7 @@ enum Message { impl<'a> JobState<'a> { pub fn running(&self, cmd: &ProcessBuilder) { - let _ = self.tx.send(Message::Run(self.id, cmd.to_string())); + self.messages.push(Message::Run(self.id, cmd.to_string())); } pub fn build_plan( @@ -208,17 +229,16 @@ impl<'a> JobState<'a> { cmd: ProcessBuilder, filenames: Arc>, ) { - let _ = self - .tx - .send(Message::BuildPlanMsg(module_name, cmd, filenames)); + self.messages + .push(Message::BuildPlanMsg(module_name, cmd, filenames)); } pub fn stdout(&self, stdout: String) { - drop(self.tx.send(Message::Stdout(stdout))); + self.messages.push_bounded(Message::Stdout(stdout)); } pub fn stderr(&self, stderr: String) { - drop(self.tx.send(Message::Stderr(stderr))); + self.messages.push_bounded(Message::Stderr(stderr)); } /// A method used to signal to the coordinator thread that the rmeta file @@ -228,9 +248,8 @@ impl<'a> JobState<'a> { /// produced once! pub fn rmeta_produced(&self) { self.rmeta_required.set(false); - let _ = self - .tx - .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); + self.messages + .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) @@ -239,14 +258,14 @@ impl<'a> JobState<'a> { /// This should arrange for the associated client to eventually get a token via /// `client.release_raw()`. pub fn will_acquire(&self) { - let _ = self.tx.send(Message::NeedsToken(self.id)); + self.messages.push(Message::NeedsToken(self.id)); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. /// /// Note that it does *not* write that token back anywhere. pub fn release_token(&self) { - let _ = self.tx.send(Message::ReleaseToken(self.id)); + self.messages.push(Message::ReleaseToken(self.id)); } } @@ -340,13 +359,15 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); - let (tx, rx) = unbounded(); let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); let state = DrainState { total_units: self.queue.len(), queue: self.queue, - tx, - rx, + // 100 here is somewhat arbitrary. It is a few screenfulls of + // output, and hopefully at most a few megabytes of memory for + // typical messages. If you change this, please update the test + // caching_large_output, too. + messages: Arc::new(Queue::new(100)), active: HashMap::new(), compiled: HashSet::new(), documented: HashSet::new(), @@ -354,7 +375,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { progress, next_id: 0, timings: self.timings, - tokens: Vec::new(), rustc_tokens: HashMap::new(), to_send_clients: BTreeMap::new(), @@ -364,25 +384,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { }; // Create a helper thread for acquiring jobserver tokens - let tx = state.tx.clone(); + let messages = state.messages.clone(); let helper = cx .jobserver .clone() .into_helper_thread(move |token| { - drop(tx.send(Message::Token(token))); + drop(messages.push(Message::Token(token))); }) .chain_err(|| "failed to create helper thread for jobserver management")?; // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = state.tx.clone(); + let messages = state.messages.clone(); + // It is important that this uses `push` instead of `push_bounded` for + // now. If someone wants to fix this to be bounded, the `drop` + // implementation needs to be changed to avoid possible deadlocks. let _diagnostic_server = cx .bcx .build_config .rustfix_diagnostic_server .borrow_mut() .take() - .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); + .map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg))))); crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) .expect("child threads shouldn't panic") @@ -584,7 +607,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to run above to calculate CPU usage over time. To do this we // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); + let mut events = self.messages.try_pop_all(); info!( "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", self.tokens.len(), @@ -602,14 +625,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { loop { self.tick_progress(); self.tokens.truncate(self.active.len() - 1); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => break vec![message], - Err(_) => continue, + match self.messages.pop(Duration::from_millis(500)) { + Some(message) => { + events.push(message); + break; + } + None => continue, } } - } else { - events } + return events; } fn drain_the_queue( @@ -756,7 +781,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { assert!(self.active.insert(id, *unit).is_none()); *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; - let my_tx = self.tx.clone(); + let messages = self.messages.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); @@ -768,13 +793,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { let doit = move || { let state = JobState { id, - tx: my_tx.clone(), + messages: messages.clone(), rmeta_required: Cell::new(rmeta_required), _marker: marker::PhantomData, }; let mut sender = FinishOnDrop { - tx: &my_tx, + messages: &messages, id, result: Err(format_err!("worker panicked")), }; @@ -793,9 +818,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // we need to make sure that the metadata is flagged as produced so // send a synthetic message here. if state.rmeta_required.get() && sender.result.is_ok() { - my_tx - .send(Message::Finish(id, Artifact::Metadata, Ok(()))) - .unwrap(); + messages.push(Message::Finish(id, Artifact::Metadata, Ok(()))); } // Use a helper struct with a `Drop` implementation to guarantee @@ -803,7 +826,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // shouldn't panic unless there's a bug in Cargo, so we just need // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { - tx: &'a Sender, + messages: &'a Queue, id: JobId, result: CargoResult<()>, } @@ -811,21 +834,17 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { impl Drop for FinishOnDrop<'_> { fn drop(&mut self) { let msg = mem::replace(&mut self.result, Ok(())); - drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); + self.messages + .push(Message::Finish(self.id, Artifact::All, msg)); } } }; match fresh { - Freshness::Fresh => { - self.timings.add_fresh(); - doit(); - } - Freshness::Dirty => { - self.timings.add_dirty(); - scope.spawn(move |_| doit()); - } + Freshness::Fresh => self.timings.add_fresh(), + Freshness::Dirty => self.timings.add_dirty(), } + scope.spawn(move |_| doit()); Ok(()) } diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index b3605052321..45e44ba61cf 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes}; pub use self::paths::{dylib_path_envvar, normalize_path}; pub use self::process_builder::{process, ProcessBuilder}; pub use self::progress::{Progress, ProgressStyle}; +pub use self::queue::Queue; pub use self::read2::read2; pub use self::restricted_names::validate_package_name; pub use self::rustc::Rustc; @@ -51,6 +52,7 @@ pub mod paths; pub mod process_builder; pub mod profile; mod progress; +mod queue; mod read2; pub mod restricted_names; pub mod rustc; diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs new file mode 100644 index 00000000000..66554ea5921 --- /dev/null +++ b/src/cargo/util/queue.rs @@ -0,0 +1,75 @@ +use std::collections::VecDeque; +use std::sync::{Condvar, Mutex}; +use std::time::Duration; + +/// A simple, threadsafe, queue of items of type `T` +/// +/// This is a sort of channel where any thread can push to a queue and any +/// thread can pop from a queue. +/// +/// This supports both bounded and unbounded operations. `push` will never block, +/// and allows the queue to grow without bounds. `push_bounded` will block if the +/// queue is over capacity, and will resume once there is enough capacity. +pub struct Queue { + state: Mutex>, + popper_cv: Condvar, + bounded_cv: Condvar, + bound: usize, +} + +struct State { + items: VecDeque, +} + +impl Queue { + pub fn new(bound: usize) -> Queue { + Queue { + state: Mutex::new(State { + items: VecDeque::new(), + }), + popper_cv: Condvar::new(), + bounded_cv: Condvar::new(), + bound, + } + } + + pub fn push(&self, item: T) { + self.state.lock().unwrap().items.push_back(item); + self.popper_cv.notify_one(); + } + + /// Pushes an item onto the queue, blocking if the queue is full. + pub fn push_bounded(&self, item: T) { + let locked_state = self.state.lock().unwrap(); + let mut state = self + .bounded_cv + .wait_while(locked_state, |s| s.items.len() >= self.bound) + .unwrap(); + state.items.push_back(item); + self.popper_cv.notify_one(); + } + + pub fn pop(&self, timeout: Duration) -> Option { + let (mut state, result) = self + .popper_cv + .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty()) + .unwrap(); + if result.timed_out() { + None + } else { + let value = state.items.pop_front()?; + if state.items.len() < self.bound { + // Assumes threads cannot be canceled. + self.bounded_cv.notify_one(); + } + Some(value) + } + } + + pub fn try_pop_all(&self) -> Vec { + let mut state = self.state.lock().unwrap(); + let result = state.items.drain(..).collect(); + self.bounded_cv.notify_all(); + result + } +} diff --git a/tests/testsuite/cache_messages.rs b/tests/testsuite/cache_messages.rs index eb31255b256..bc5f365b90a 100644 --- a/tests/testsuite/cache_messages.rs +++ b/tests/testsuite/cache_messages.rs @@ -437,3 +437,62 @@ line 2 ) .run(); } + +#[cargo_test] +fn caching_large_output() { + // Handles large number of messages. + // This is an arbitrary amount that is greater than the 100 used in + // job_queue. This is here to check for deadlocks or any other problems. + const COUNT: usize = 250; + let rustc = project() + .at("rustc") + .file("Cargo.toml", &basic_manifest("rustc_alt", "1.0.0")) + .file( + "src/main.rs", + &format!( + r#" + fn main() {{ + for i in 0..{} {{ + eprintln!("{{{{\"message\": \"test message {{}}\", \"level\": \"warning\", \ + \"spans\": [], \"children\": [], \"rendered\": \"test message {{}}\"}}}}", + i, i); + }} + let r = std::process::Command::new("rustc") + .args(std::env::args_os().skip(1)) + .status(); + std::process::exit(r.unwrap().code().unwrap_or(2)); + }} + "#, + COUNT + ), + ) + .build(); + + let mut expected = String::new(); + for i in 0..COUNT { + expected.push_str(&format!("test message {}\n", i)); + } + + rustc.cargo("build").run(); + let p = project().file("src/lib.rs", "").build(); + p.cargo("check") + .env("RUSTC", rustc.bin("rustc_alt")) + .with_stderr(&format!( + "\ +[CHECKING] foo [..] +{}[FINISHED] dev [..] +", + expected + )) + .run(); + + p.cargo("check") + .env("RUSTC", rustc.bin("rustc_alt")) + .with_stderr(&format!( + "\ +{}[FINISHED] dev [..] +", + expected + )) + .run(); +}