Skip to content

Commit

Permalink
Move lock into HELPER to avoid deadlocking
Browse files Browse the repository at this point in the history
Previously, we were holding a global lock whenever we were waiting for a
requested token, and since the only way to release a token was by acquiring that
lock, we could relatively easily deadlock (at least if `-j1` was passed, or so,
otherwise it's likely we'd just be much slower).
  • Loading branch information
Mark-Simulacrum committed Feb 15, 2020
1 parent fd0bc0e commit d88fcc1
Showing 1 changed file with 66 additions and 55 deletions.
121 changes: 66 additions & 55 deletions src/librustc_jobserver/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,19 @@
//! communication with the jobserver (otherwise it's too hard to work with the
//! jobserver API). This is non-ideal, and it would be good to avoid, but
//! currently that cost is pretty much required for correct functionality, as we
//! must be able to atomically wait on both a Condvar (for other threads
//! releasing the implicit token) and the jobserver itself. That's not possible
//! with the jobserver API today unless we spawn up an additional thread.
//! must be able to atomically wait on both other threads releasing the implicit
//! token and the jobserver itself. That's not possible with the jobserver API
//! today unless we spawn up an additional thread.
//!
//! There are 3 primary APIs this crate exposes:
//! * acquire()
//! * release()
//! * acquire_from_request()
//! * request_token()
//! * `acquire_thread()`
//! * `release_thread()`
//! * `request_token(impl FnOnce(Acquired))`
//!
//! The first two, acquire and release, are blocking functions which acquire
//! and release a jobserver token.
//! `acquire_thread` blocks on obtaining a token, `release_thread` releases a
//! token without blocking.
//!
//! The latter two help manage async requesting of tokens: specifically,
//! acquire_from_request() will block on acquiring token but will not request it
//! from the jobserver itself, whereas the last one just requests a token (and
//! should return pretty quickly, i.e., it does not block on some event).
//! `request_token` queues up the called function without blocking.
//!
//! -------------------------------------
//!
Expand Down Expand Up @@ -62,8 +58,9 @@ use jobserver::Client;
use lazy_static::lazy_static;
use rustc_serialize::json::as_json;
use std::collections::VecDeque;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};

lazy_static! {
// We can only call `from_env` once per process
Expand Down Expand Up @@ -91,7 +88,7 @@ lazy_static! {
})
};

static ref HELPER: Mutex<Helper> = Mutex::new(Helper::new());
static ref HELPER: Helper = Helper::new();
}

// These are the values for TOKEN_REQUESTS, which is an enum between these
Expand Down Expand Up @@ -139,22 +136,38 @@ pub fn initialize(token_requests: bool) {
}
}

pub struct Helper {
struct Helper {
helper: jobserver::HelperThread,

token_requests: Arc<Mutex<TokenRequests>>,
}

struct TokenRequests {
tokens: usize,
requests: Arc<Mutex<VecDeque<Box<dyn FnOnce(Acquired) + Send>>>>,
requests: VecDeque<Box<dyn FnOnce(Acquired) + Send>>,
}

impl TokenRequests {
fn new() -> Self {
Self { tokens: 1, requests: VecDeque::new() }
}

fn push_request(&mut self, request: impl FnOnce(Acquired) + Send + 'static) {
self.requests.push_back(Box::new(request));
}
}

impl Helper {
fn new() -> Self {
let requests: Arc<Mutex<VecDeque<Box<dyn FnOnce(Acquired) + Send>>>> =
Arc::new(Mutex::new(VecDeque::new()));
let requests2 = requests.clone();
let requests = Arc::new(Mutex::new(TokenRequests::new()));
let helper_thread_requests = requests.clone();
let helper = GLOBAL_CLIENT
.clone()
.into_helper_thread(move |token| {
log::trace!("Helper thread token sending into channel");
if let Some(sender) = requests2.lock().unwrap().pop_front() {
let mut helper_thread_requests = helper_thread_requests.lock().unwrap();
let sender = helper_thread_requests.requests.pop_front();
if let Some(sender) = sender {
// We've acquired a token, but we need to not use it as we have our own
// custom release-on-drop struct since we'll want different logic than
// just normally releasing the token in this case.
Expand All @@ -163,49 +176,47 @@ impl Helper {
// was in the pipe (i.e., we just write back the same byte all the time)
// but that's not expected to be a problem.
token.expect("acquire token").drop_without_releasing();
sender(Acquired::new());
sender(Acquired::new(helper_thread_requests));
}

// If we didn't manage to send the token off, just drop it on
// the ground; it'll get released automatically.
})
.expect("spawned helper");
Helper { helper, tokens: 1, requests }
Helper { helper, token_requests: requests }
}

// This blocks on acquiring a token (that must have been previously
// requested).
fn acquire_token_from_prior_request(&mut self) -> Acquired {
if self.tokens == 0 {
self.tokens += 1;
return Acquired::new();
// This blocks on acquiring a token that was requested from the
// HelperThread, i.e., through `Helper::request_token`.
fn acquire_token_from_prior_request(&self) -> Acquired {
let mut token_requests = self.token_requests.lock().unwrap();
if token_requests.tokens == 0 {
return Acquired::new(token_requests);
}

let receiver = Arc::new((Mutex::new(None), Condvar::new()));
let receiver2 = receiver.clone();

self.requests.lock().unwrap().push_back(Box::new(move |token| {
token_requests.push_request(move |token| {
let mut slot = receiver.0.lock().unwrap();
*slot = Some(token);
receiver.1.notify_one();
}));
});

let (lock, cvar) = &*receiver2;
let mut guard = cvar.wait_while(lock.lock().unwrap(), |slot| slot.is_none()).unwrap();
// Release tokens guard after registering our callback
mem::drop(token_requests);

self.tokens += 1;
let (lock, cvar) = &*receiver2;
let mut guard = cvar.wait_while(lock.lock().unwrap(), |s| s.is_none()).unwrap();
guard.take().unwrap()
}

fn release_token(&mut self) {
let mut requests = self.requests.lock().unwrap();

self.tokens -= 1;

if self.tokens == 0 {
fn release_token(&self) {
let mut token_requests = self.token_requests.lock().unwrap();
token_requests.tokens -= 1;
if token_requests.tokens == 0 {
// If there is a sender, then it needs to be given this token.
if let Some(sender) = requests.pop_front() {
sender(Acquired::new());
if let Some(sender) = token_requests.requests.pop_front() {
sender(Acquired::new(token_requests));
return;
}

Expand All @@ -219,7 +230,7 @@ impl Helper {
}
}

pub fn request_token(&self) {
fn request_token(&self) {
log::trace!("{:?} requesting token", std::thread::current().id());
// Just notify, don't actually acquire here.
notify_acquiring_token();
Expand All @@ -241,7 +252,9 @@ impl Drop for Acquired {
}

impl Acquired {
fn new() -> Self {
fn new(mut requests: MutexGuard<'_, TokenRequests>) -> Self {
// When we create a token, bump up the acquired token counter
requests.tokens += 1;
Self { armed: true }
}

Expand All @@ -267,21 +280,19 @@ fn notify_acquiring_token() {
}
}

/// This does not block the current thread, but schedules the passed callback to
/// be called at some point in the future when a token is acquired.
pub fn request_token(f: impl FnOnce(Acquired) + Send + 'static) {
HELPER.lock().unwrap().requests.lock().unwrap().push_back(Box::new(move |token| {
f(token);
}));
}

pub fn acquire_from_request() -> Acquired {
HELPER.lock().unwrap().acquire_token_from_prior_request()
HELPER.token_requests.lock().unwrap().push_request(f);
HELPER.request_token();
}

/// This blocks the current thread until a token is acquired.
pub fn acquire_thread() {
HELPER.lock().unwrap().request_token();
HELPER.lock().unwrap().acquire_token_from_prior_request().disarm();
HELPER.request_token();
HELPER.acquire_token_from_prior_request().disarm();
}

pub fn release_thread() {
HELPER.lock().unwrap().release_token();
HELPER.release_token();
}

0 comments on commit d88fcc1

Please sign in to comment.