Skip to content

Commit

Permalink
Implement Mutex::try_lock
Browse files Browse the repository at this point in the history
Without `try_lock` it was easier to justify context switches, because
acquire was a right mover (we only needed a context switch before) and
release was a left mover (we only needed a context switch after).
However, with `try_lock` that is not the case anymore. This commit
argues why we need a context switch at the end of `lock` and `try_lock`
(both in the success and failure case), and why we do not need a context
switch at the beginning of `try_lock` and `MutexGuard::drop`.
  • Loading branch information
bkragl committed Jun 15, 2022
1 parent 71b8ad2 commit ca23ea6
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 112 deletions.
96 changes: 88 additions & 8 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,18 @@ impl<T: ?Sized> Mutex<T> {
panic!("deadlock! task {:?} tried to acquire a Mutex it already holds", me);
}
ExecutionState::with(|s| s.current_mut().block());
drop(state);

// Note that we only need a context switch when we are blocked, but not if the lock is
// available. Consider that there is another thread `t` that also wants to acquire the
// lock. At the last context switch (where we were chosen), `t` must have been already
// runnable and could have been chosen by the scheduler instead. Also, if we want to
// re-acquiring the lock immediately after having it released, we know that the release
// had a context switch that allowed other threads to acquire in between.
thread::switch();
state = self.state.borrow_mut();
}
drop(state);

// Acquiring a lock is a yield point
thread::switch();

let mut state = self.state.borrow_mut();
// Once the scheduler has resumed this thread, we are clear to become its holder.
assert!(state.waiters.remove(me));
assert!(state.holder.is_none());
Expand All @@ -84,7 +89,7 @@ impl<T: ?Sized> Mutex<T> {
drop(state);

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
match self.inner.try_lock() {
let result = match self.inner.try_lock() {
Ok(guard) => Ok(MutexGuard {
inner: Some(guard),
mutex: self,
Expand All @@ -94,15 +99,84 @@ impl<T: ?Sized> Mutex<T> {
mutex: self,
})),
Err(TryLockError::WouldBlock) => panic!("mutex state out of sync"),
}
};

// We need to let other threads in here so they may fail a `try_lock`. This is the case
// because the current thread holding the lock might not have any further context switches
// until after releasing the lock. The `concurrent_lock_try_lock` test illustrates this
// scenario and would fail if this context switch is not here.
thread::switch();

result
}

/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then Err is returned. This function does not
/// block.
pub fn try_lock(&self) -> TryLockResult<MutexGuard<T>> {
unimplemented!()
let me = ExecutionState::me();

let mut state = self.state.borrow_mut();
trace!(holder=?state.holder, waiters=?state.waiters, "trying to acquire mutex {:p}", self);

// We don't need a context switch here. There are two cases to analyze.
// * Consider that `state.holder == None` so that we manage to acquire the lock, but that
// there is another thread `t` that also wants to acquire. At the last context switch
// (where we were chosen), `t` must have been already runnable and could have been chosen
// by the scheduler instead. Then `t`'s acquire has a context switch that allows us to
// run into the `WouldBlock` case.
// * Consider that `state.holder == Some(t)` so that we run into the `WouldBlock` case,
// but that `t` wants to release. At the last context switch (where we were chosen), `t`
// must have been already runnable and could have been chosen by the scheduler instead.
// Then `t`'s release has a context switch that allows us to acquire the lock.

let result = if let Some(holder) = state.holder {
trace!("`try_lock` failed to acquire mutex {:p} held by {:?}", self, holder);

// Update the vector clock stored in the Mutex, because future threads that manage to
// acquire the lock have a causal dependency on this failed `try_lock`.
ExecutionState::with(|s| state.clock.update(s.get_clock(me)));

Err(TryLockError::WouldBlock)
} else {
state.holder = Some(me);

trace!("acquired mutex {:p}", self);

// Re-block all other waiting threads, since we won the race to take this lock
for tid in state.waiters.iter() {
ExecutionState::with(|s| s.get_mut(tid).block());
}

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
match self.inner.try_lock() {
Ok(guard) => Ok(MutexGuard {
inner: Some(guard),
mutex: self,
}),
Err(TryLockError::Poisoned(guard)) => Err(TryLockError::Poisoned(PoisonError::new(MutexGuard {
inner: Some(guard.into_inner()),
mutex: self,
}))),
Err(TryLockError::WouldBlock) => panic!("mutex state out of sync"),
}
};

// Update this thread's clock with the clock stored in the Mutex.
// We need to do the vector clock update even in the failing case, because there's a causal
// dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2`
// that owns the lock is in its critical section, and therefore `t1` has a causal dependency
// on everything that happened before in `t2` (which is recorded in the Mutex's clock).
ExecutionState::with(|s| s.update_clock(&state.clock));
drop(state);

// We need to let other threads in here so they
// (a) may fail a `try_lock` (in case we acquired), or
// (b) may release the lock (in case we failed to acquire) so we can succeed in a subsequent `try_lock`.
thread::switch();

result
}

/// Consumes this mutex, returning the underlying data.
Expand Down Expand Up @@ -153,6 +227,12 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {

impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
// We don't need a context switch here *before* releasing the lock. There are two cases to analyze.
// * Other threads that want to `lock` are still blocked at this point.
// * Other threads that want to `try_lock` and would fail at this point (but not after we release)
// were already runnable at the last context switch (which could have been right after we acquired)
// and could have been scheduled then to fail the `try_lock`.

self.inner = None;

let mut state = self.mutex.state.borrow_mut();
Expand Down
155 changes: 84 additions & 71 deletions tests/basic/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,100 +232,113 @@ fn notify_one_order() {
/// From "Operating Systems: Three Easy Pieces", Figure 30.8.
/// Demonstrates why a waiter needs to check the condition in a `while` loop, not an if.
/// http://pages.cs.wisc.edu/~remzi/OSTEP/threads-cv.pdf
#[test]
#[should_panic(expected = "nothing to get")]
fn producer_consumer_broken1() {
replay(
|| {
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));

// Two consumers
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
if count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
// get()
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard); // explicit unlock to match the book
}
});
}
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));

// One producer
// Two consumers
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
if count.load(Ordering::SeqCst) == 1 {
if count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
// put()
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
// get()
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard);
drop(guard); // explicit unlock to match the book
}
},
"910215000000408224002229",
});
}

// One producer
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
if count.load(Ordering::SeqCst) == 1 {
guard = cond.wait(guard).unwrap();
}
// put()
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
}

#[test]
#[should_panic]
fn check_producer_consumer_broken1() {
check_random(producer_consumer_broken1, 5000)
}

#[test]
#[should_panic(expected = "nothing to get")]
fn replay_roducer_consumer_broken1() {
replay(
producer_consumer_broken1,
"910219ccf2ead7a59dee9e4590000282249100208904",
)
}

/// From "Operating Systems: Three Easy Pieces", Figure 30.10. Like `producer_consumer_broken1`, but
/// with a while loop, not an if.
/// Demonstrates why one condvar is not sufficient for a concurrent queue.
/// http://pages.cs.wisc.edu/~remzi/OSTEP/threads-cv.pdf
#[test]
#[should_panic(expected = "deadlock")]
fn producer_consumer_broken2() {
replay(
|| {
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));

// Two consumers
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..1 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
// get()
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
});
}
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));

// One producer
for _ in 0..2 {
// Two consumers
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..1 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 1 {
while count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
// put()
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
// get()
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
},
"9102110090401004405202",
)
});
}

// One producer
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 1 {
guard = cond.wait(guard).unwrap();
}
// put()
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
}

#[test]
#[should_panic]
fn check_producer_consumer_broken2() {
check_random(producer_consumer_broken2, 5000)
}

#[test]
#[should_panic(expected = "deadlock")]
fn replay_roducer_consumer_broken2() {
replay(producer_consumer_broken2, "91021499a0ee829bee85922b104410200052a404")
}

/// From "Operating Systems: Three Easy Pieces", Figure 30.12. Like `producer_consumer_broken2`, but
Expand Down
Loading

0 comments on commit ca23ea6

Please sign in to comment.