Skip to content

Commit

Permalink
Implement Mutex::try_lock
Browse files Browse the repository at this point in the history
Without `try_lock` it was easier to justify context switches, because
acquire was a right mover (we only needed a context switch before) and
release was a left mover (we only needed a context switch after).
However, with `try_lock` that is not the case anymore. This commit
argues why we need a context switch at the end of `lock` and `try_lock`
(both in the success and failure case), and why we do not need a context
switch at the beginning of `try_lock` and `MutexGuard::drop`.
  • Loading branch information
bkragl committed May 25, 2022
1 parent 71b8ad2 commit fce040a
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 5 deletions.
76 changes: 73 additions & 3 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T: ?Sized> Mutex<T> {
drop(state);

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
match self.inner.try_lock() {
let result = match self.inner.try_lock() {
Ok(guard) => Ok(MutexGuard {
inner: Some(guard),
mutex: self,
Expand All @@ -94,15 +94,79 @@ impl<T: ?Sized> Mutex<T> {
mutex: self,
})),
Err(TryLockError::WouldBlock) => panic!("mutex state out of sync"),
}
};

// We need to let other threads in here so they may fail a `try_lock`
thread::switch();

result
}

/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then Err is returned. This function does not
/// block.
pub fn try_lock(&self) -> TryLockResult<MutexGuard<T>> {
unimplemented!()
let me = ExecutionState::me();

let mut state = self.state.borrow_mut();
trace!(holder=?state.holder, waiters=?state.waiters, "trying to acquire mutex {:p}", self);

if let Some(holder) = state.holder {
if holder == me {
panic!("deadlock! task {:?} tried to acquire a Mutex it already holds", me);
}
}

// We don't need a context switch here. There are two cases to analyze.
// * Consider that we manage to acquire the lock, but that there is another thread that
// also wants to acquire. At the last context switch (where we were chosen), the other
// thread must have been already runnable and could have been chosen by the scheduler.
// Then that other thread's acquire has a context switch that allows us to run into the
// `WouldBlock` case.
// * Consider that we run into the `WouldBlock` case, but that there is another thread
// that wants to release. At the last context switch (where we were chosen), the other
// thread must have been already runnable and could have been chosen by the scheduler.
// Then that other thread's release has a context switch that allows us to acquire the
// lock.

let result = if state.holder.is_none() {
state.holder = Some(me);

trace!("acquired mutex {:p}", self);

// Re-block all other waiting threads, since we won the race to take this lock
for tid in state.waiters.iter() {
ExecutionState::with(|s| s.get_mut(tid).block());
}
// Update acquiring thread's clock with the clock stored in the Mutex
ExecutionState::with(|s| s.update_clock(&state.clock));

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
match self.inner.try_lock() {
Ok(guard) => Ok(MutexGuard {
inner: Some(guard),
mutex: self,
}),
Err(TryLockError::Poisoned(guard)) => Err(TryLockError::Poisoned(PoisonError::new(MutexGuard {
inner: Some(guard.into_inner()),
mutex: self,
}))),
Err(TryLockError::WouldBlock) => panic!("mutex state out of sync"),
}
} else {
trace!("failed to acquired mutex {:p}", self);
Err(TryLockError::WouldBlock)
};

drop(state);

// We need to let other threads in here so they may fail a `try_lock` (in case we acquired)
// or they may release the lock (in case we failed to acquire) so we can succeed in a
// subsequent `try_lock`.
thread::switch();

result
}

/// Consumes this mutex, returning the underlying data.
Expand Down Expand Up @@ -153,6 +217,12 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {

impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
// We don't need a context switch here *before* releasing the lock. There are two cases to analyze.
// * Other threads that want to `lock` are still blocked at this point.
// * Other threads that want to `try_lock` and would fail at this point (but not after we release)
// were already runnable at the last context switch (which could have been right after we acquired)
// and could have been scheduled then to fail the `try_lock`.

self.inner = None;

let mut state = self.mutex.state.borrow_mut();
Expand Down
125 changes: 123 additions & 2 deletions tests/basic/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use shuttle::scheduler::PctScheduler;
use shuttle::sync::Mutex;
use shuttle::{check, check_random, thread, Runner};
use std::sync::Arc;
use shuttle::{check, check_dfs, check_random, thread, Runner};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, TryLockError};
use test_log::test;

#[test]
Expand Down Expand Up @@ -114,6 +116,125 @@ fn concurrent_increment() {
});
}

#[test]
/// Two concurrent threads trying to do an atomic increment using `try_lock`.
/// One `try_lock` must succeed, while the other may or may not succeed.
/// Thus we expect to see final values 1 and 2.
fn concurrent_try_increment() {
let final_value_tracker = Arc::new(
(1usize..=2usize)
.map(|i| (i, std::sync::atomic::AtomicBool::new(false)))
.collect::<HashMap<_, _>>(),
);
let final_value_tracker_clone = Arc::clone(&final_value_tracker);

check_dfs(
move || {
let lock = Arc::new(Mutex::new(0usize));

let threads = (0..2)
.map(|_| {
let lock = Arc::clone(&lock);
thread::spawn(move || match lock.try_lock() {
Ok(mut guard) => {
*guard += 1;
}
Err(TryLockError::WouldBlock) => (),
Err(_) => panic!("unexpected TryLockError"),
})
})
.collect::<Vec<_>>();

for thd in threads {
thd.join().unwrap();
}

let value = *lock.try_lock().unwrap();

final_value_tracker_clone
.get(&value)
.expect("unexpected final value")
.store(true, Ordering::SeqCst);
},
None,
);

for (value, hit) in final_value_tracker.iter() {
assert!(
hit.load(Ordering::SeqCst),
"did not see an interleaving with final value {}",
value
);
}
}

#[test]
/// Two concurrent threads, one doing an atomic increment by 1 using `lock`,
/// the other trying to do an atomic increment by 1 followed by trying to do
/// an atomic increment by 2 using `try_lock`. The `lock` must succeed, while
/// both `try_lock`s may or may not succeed. Thus we expect to see final value
/// 1 (both `try_lock`s fail), 2 (second `try_lock` fails), 3 (first `try_lock`
/// fails), and 4 (both `try_lock`s succeed).
fn concurrent_lock_try_lock() {
let final_value_tracker = Arc::new(
(1usize..=4usize)
.map(|i| (i, std::sync::atomic::AtomicBool::new(false)))
.collect::<HashMap<_, _>>(),
);
let final_value_tracker_clone = Arc::clone(&final_value_tracker);

check_dfs(
move || {
let lock = Arc::new(Mutex::new(0usize));

let lock_thread = {
let lock = Arc::clone(&lock);
thread::spawn(move || {
*lock.lock().unwrap() += 1;
})
};
let try_lock_thread = {
let lock = Arc::clone(&lock);
thread::spawn(move || {
match lock.try_lock() {
Ok(mut guard) => {
*guard += 1;
}
Err(TryLockError::WouldBlock) => (),
Err(_) => panic!("unexpected TryLockError"),
};
match lock.try_lock() {
Ok(mut guard) => {
*guard += 2;
}
Err(TryLockError::WouldBlock) => (),
Err(_) => panic!("unexpected TryLockError"),
}
})
};

lock_thread.join().unwrap();
try_lock_thread.join().unwrap();

let value = *lock.try_lock().unwrap();

final_value_tracker_clone
.get(&value)
.expect("unexpected final value")
.store(true, Ordering::SeqCst);
},
None,
);

for (value, hit) in final_value_tracker.iter() {
assert!(
hit.load(Ordering::SeqCst),
"did not see an interleaving with final value {}",
value
);
}
}

// Check that we can safely execute the Drop handler of a Mutex without double-panicking
#[test]
#[should_panic(expected = "expected panic")]
Expand Down

0 comments on commit fce040a

Please sign in to comment.