Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prerequisites from dep graph refactoring #60559

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,7 @@ name = "rustc_data_structures"
version = "0.0.0"
dependencies = [
"cfg-if 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"ena 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"graphviz 0.0.0",
"indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
Expand Down
1 change: 1 addition & 0 deletions src/librustc_data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rustc_cratesio_shim = { path = "../librustc_cratesio_shim" }
serialize = { path = "../libserialize" }
graphviz = { path = "../libgraphviz" }
cfg-if = "0.1.2"
crossbeam-utils = { version = "0.6.5", features = ["nightly"] }
stable_deref_trait = "1.0.0"
rayon = { version = "0.2.0", package = "rustc-rayon" }
rayon-core = { version = "0.2.0", package = "rustc-rayon-core" }
Expand Down
1 change: 1 addition & 0 deletions src/librustc_data_structures/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#![feature(stmt_expr_attributes)]
#![feature(core_intrinsics)]
#![feature(integer_atomics)]
#![feature(arbitrary_self_types)]

#![cfg_attr(unix, feature(libc))]
#![cfg_attr(test, feature(test))]
Expand Down
73 changes: 72 additions & 1 deletion src/librustc_data_structures/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use crate::owning_ref::{Erased, OwningRef};

pub mod worker;

pub fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where A: FnOnce() -> RA,
B: FnOnce() -> RB
Expand Down Expand Up @@ -67,6 +69,71 @@ cfg_if! {
use std::ops::Add;
use std::panic::{resume_unwind, catch_unwind, AssertUnwindSafe};

#[derive(Debug)]
pub struct AtomicCell<T: Copy>(Cell<T>);

impl<T: Copy> AtomicCell<T> {
#[inline]
pub fn new(v: T) -> Self {
AtomicCell(Cell::new(v))
}

#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
}

impl<T: Copy> AtomicCell<T> {
pub fn into_inner(self) -> T {
self.0.into_inner()
}

#[inline]
pub fn load(&self) -> T {
self.0.get()
}

#[inline]
pub fn store(&self, val: T) {
self.0.set(val)
}

pub fn swap(&self, val: T) -> T {
self.0.replace(val)
}
}

impl<T: Copy + PartialEq> AtomicCell<T> {
pub fn compare_and_swap(&self, current: T, new: T) -> T {
match self.compare_exchange(current, new) {
Ok(v) => v,
Err(v) => v,
}
}

pub fn compare_exchange(&self,
current: T,
new: T)
-> Result<T, T> {
let read = self.0.get();
if read == current {
self.0.set(new);
Ok(read)
} else {
Err(read)
}
}
}

impl<T: Add<Output=T> + Copy> AtomicCell<T> {
pub fn fetch_add(&self, val: T) -> T {
let old = self.0.get();
self.0.set(old + val);
old
}
}

#[derive(Debug)]
pub struct Atomic<T: Copy>(Cell<T>);

Expand All @@ -77,7 +144,7 @@ cfg_if! {
}
}

impl<T: Copy + PartialEq> Atomic<T> {
impl<T: Copy> Atomic<T> {
pub fn into_inner(self) -> T {
self.0.into_inner()
}
Expand All @@ -95,7 +162,9 @@ cfg_if! {
pub fn swap(&self, val: T, _: Ordering) -> T {
self.0.replace(val)
}
}

impl<T: Copy + PartialEq> Atomic<T> {
pub fn compare_exchange(&self,
current: T,
new: T,
Expand Down Expand Up @@ -271,6 +340,8 @@ cfg_if! {

pub use std::sync::atomic::{AtomicBool, AtomicUsize, AtomicU32, AtomicU64};

pub use crossbeam_utils::atomic::AtomicCell;

pub use std::sync::Arc as Lrc;
pub use std::sync::Weak as Weak;

Expand Down
169 changes: 169 additions & 0 deletions src/librustc_data_structures/sync/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use super::{Lrc, Lock};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add module level documentation here that describes what this is? In particular:

  • When would I use this? What are the typical use cases?
  • How do I use it? A usage example would be good.
  • Do I instantiate it myself? Is there already an executor available in the context somewhere?
  • What is the usage protocol? Do I need to worry about deadlocks? Resource management?
  • Does it integrate with the jobserver?
  • Is there anything else that users should be aware of?


pub trait Worker: super::Send {
type Message: super::Send;
type Result: super::Send;

fn message(&mut self, msg: Self::Message);
fn complete(self) -> Self::Result;
}

pub use executor::WorkerExecutor;

#[cfg(parallel_compiler)]
mod executor {
use super::*;
use crate::jobserver;
use parking_lot::Condvar;
use std::mem;

struct WorkerQueue<T: Worker> {
scheduled: bool,
complete: bool,
messages: Vec<T::Message>,
result: Option<T::Result>,
}

/// Allows executing a worker on any Rayon thread,
/// sending it messages and waiting for it to complete its computation.
pub struct WorkerExecutor<T: Worker> {
queue: Lock<WorkerQueue<T>>,
worker: Lock<Option<T>>,
#[cfg(parallel_compiler)]
cond_var: Condvar,
}

impl<T: Worker> WorkerExecutor<T> {
pub fn new(worker: T) -> Self {
WorkerExecutor {
queue: Lock::new(WorkerQueue {
scheduled: false,
complete: false,
messages: Vec::new(),
result: None,
}),
worker: Lock::new(Some(worker)),
#[cfg(parallel_compiler)]
cond_var: Condvar::new(),
}
}

fn try_run_worker(&self) {
if let Some(mut worker) = self.worker.try_lock() {
self.run_worker(&mut *worker);
}
}

fn run_worker(&self, worker: &mut Option<T>) {
let worker_ref = if let Some(worker_ref) = worker.as_mut() {
worker_ref
} else {
return
};
loop {
let msgs = {
let mut queue = self.queue.lock();
let msgs = mem::replace(&mut queue.messages, Vec::new());
if msgs.is_empty() {
queue.scheduled = false;
if queue.complete {
queue.result = Some(worker.take().unwrap().complete());
self.cond_var.notify_all();
}
break;
}
msgs
};
for msg in msgs {
worker_ref.message(msg);
}
}
}

pub fn complete(&self) -> T::Result {
let mut queue = self.queue.lock();
assert!(!queue.complete);
queue.complete = true;
if !queue.scheduled {
// The worker is not scheduled to run, just run it on the current thread.
queue.scheduled = true;
mem::drop(queue);
self.run_worker(&mut *self.worker.lock());
queue = self.queue.lock();
} else if let Some(mut worker) = self.worker.try_lock() {
// Try to run the worker on the current thread.
// It was scheduled to run, but it may not have started yet.
// If we are using a single thread, it may never start at all.
mem::drop(queue);
self.run_worker(&mut *worker);
queue = self.queue.lock();
} else {
// The worker must be running on some other thread,
// and will eventually look at the queue again, since queue.scheduled is true.
// Wait for it.

#[cfg(parallel_compiler)]
{
// Wait for the result
jobserver::release_thread();
self.cond_var.wait(&mut queue);
jobserver::acquire_thread();
}
}
queue.result.take().unwrap()
}

fn queue_message(&self, msg: T::Message) -> bool {
let mut queue = self.queue.lock();
queue.messages.push(msg);
let was_scheduled = queue.scheduled;
if !was_scheduled {
queue.scheduled = true;
}
was_scheduled
}

pub fn message_in_pool(self: &Lrc<Self>, msg: T::Message)
where
T: 'static
{
if !self.queue_message(msg) {
let this = self.clone();
#[cfg(parallel_compiler)]
rayon::spawn(move || this.try_run_worker());
#[cfg(not(parallel_compiler))]
this.try_run_worker();
}
}
}
}

#[cfg(not(parallel_compiler))]
mod executor {
use super::*;

pub struct WorkerExecutor<T: Worker> {
worker: Lock<Option<T>>,
}

impl<T: Worker> WorkerExecutor<T> {
pub fn new(worker: T) -> Self {
WorkerExecutor {
worker: Lock::new(Some(worker)),
}
}

#[inline]
pub fn complete(&self) -> T::Result {
self.worker.lock().take().unwrap().complete()
}

#[inline]
pub fn message_in_pool(self: &Lrc<Self>, msg: T::Message)
where
T: 'static
{
self.worker.lock().as_mut().unwrap().message(msg);
}
}
}
33 changes: 4 additions & 29 deletions src/librustc_typeck/coherence/inherent_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! `tcx.inherent_impls(def_id)`). That value, however,
//! is computed by selecting an idea from this table.

use rustc::dep_graph::DepKind;
use rustc::hir::def_id::{CrateNum, DefId, LOCAL_CRATE};
use rustc::hir;
use rustc::hir::itemlikevisit::ItemLikeVisitor;
Expand Down Expand Up @@ -36,35 +35,11 @@ pub fn crate_inherent_impls<'tcx>(
pub fn inherent_impls<'tcx>(tcx: TyCtxt<'tcx>, ty_def_id: DefId) -> &'tcx [DefId] {
assert!(ty_def_id.is_local());

// NB. Until we adopt the red-green dep-tracking algorithm (see
// [the plan] for details on that), we do some hackery here to get
// the dependencies correct. Basically, we use a `with_ignore` to
// read the result we want. If we didn't have the `with_ignore`,
// we would wind up with a dependency on the entire crate, which
// we don't want. Then we go and add dependencies on all the impls
// in the result (which is what we wanted).
//
// The result is a graph with an edge from `Hir(I)` for every impl
// `I` defined on some type `T` to `CoherentInherentImpls(T)`,
// thus ensuring that if any of those impls change, the set of
// inherent impls is considered dirty.
//
// [the plan]: https://github.com/rust-lang/rust-roadmap/issues/4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

let result = tcx.dep_graph.with_ignore(|| {
let crate_map = tcx.crate_inherent_impls(ty_def_id.krate);
match crate_map.inherent_impls.get(&ty_def_id) {
Some(v) => &v[..],
None => &[],
}
});

for &impl_def_id in &result[..] {
let def_path_hash = tcx.def_path_hash(impl_def_id);
tcx.dep_graph.read(def_path_hash.to_dep_node(DepKind::Hir));
let crate_map = tcx.crate_inherent_impls(ty_def_id.krate);
match crate_map.inherent_impls.get(&ty_def_id) {
Some(v) => &v[..],
None => &[],
}

result
}

struct InherentCollect<'tcx> {
Expand Down