Skip to content

Commit

Permalink
Minor tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesbornholt committed Jun 28, 2022
1 parent 4922596 commit 64ff2f3
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 66 deletions.
75 changes: 43 additions & 32 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,48 @@ impl<T: ?Sized> Mutex<T> {
let mut state = self.state.borrow_mut();
trace!(holder=?state.holder, waiters=?state.waiters, "waiting to acquire mutex {:p}", self);

// We are waiting for the lock
state.waiters.insert(me);
// If the lock is already held, then we are blocked
if let Some(holder) = state.holder {
if holder == me {
panic!("deadlock! task {:?} tried to acquire a Mutex it already holds", me);
}
ExecutionState::with(|s| s.current_mut().block());

state.waiters.insert(me);
drop(state);

// Note that we only need a context switch when we are blocked, but not if the lock is
// available. Consider that there is another thread `t` that also wants to acquire the
// lock. At the last context switch (where we were chosen), `t` must have been already
// runnable and could have been chosen by the scheduler instead. Also, if we want to
// re-acquire the lock immediately after having it released, we know that the release
// had a context switch that allowed other threads to acquire in between.
// re-acquire the lock immediately after releasing it, we know that the release had a
// context switch that allowed other threads to acquire in between.
ExecutionState::with(|s| s.current_mut().block());
thread::switch();
state = self.state.borrow_mut();

// Once the scheduler has resumed this thread, we are clear to become its holder.
assert!(state.waiters.remove(me));
}

// Once the scheduler has resumed this thread, we are clear to become its holder.
assert!(state.waiters.remove(me));
assert!(state.holder.is_none());
state.holder = Some(me);

trace!(waiters=?state.waiters, "acquired mutex {:p}", self);

// Re-block all other waiting threads, since we won the race to take this lock
for tid in state.waiters.iter() {
ExecutionState::with(|s| s.get_mut(tid).block());
}
// Update acquiring thread's clock with the clock stored in the Mutex
ExecutionState::with(|s| s.update_clock(&state.clock));
// Update the vector clock stored in the Mutex with this threads clock.
// Future threads that fail a `try_lock` have a causal dependency on this thread's acquire.
ExecutionState::with(|s| state.clock.update(s.get_clock(me)));
ExecutionState::with(|s| {
// Re-block all other waiting threads, since we won the race to take this lock
for tid in state.waiters.iter() {
s.get_mut(tid).block();
}

// Update acquiring thread's clock with the clock stored in the Mutex
s.update_clock(&state.clock);

// Update the vector clock stored in the Mutex with this threads clock.
// Future threads that fail a `try_lock` have a causal dependency on this thread's acquire.
state.clock.update(s.get_clock(me));
});

drop(state);

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
Expand Down Expand Up @@ -135,17 +141,19 @@ impl<T: ?Sized> Mutex<T> {
// Then `t`'s release has a context switch that allows us to acquire the lock.

let result = if let Some(holder) = state.holder {
trace!("`try_lock` failed to acquire mutex {:p} held by {:?}", self, holder);
trace!("try_lock failed for mutex {:p} held by {:?}", self, holder);
Err(TryLockError::WouldBlock)
} else {
state.holder = Some(me);

trace!("acquired mutex {:p}", self);
trace!("try_lock acquired mutex {:p}", self);

// Re-block all other waiting threads, since we won the race to take this lock
for tid in state.waiters.iter() {
ExecutionState::with(|s| s.get_mut(tid).block());
}
ExecutionState::with(|s| {
for tid in state.waiters.iter() {
s.get_mut(tid).block();
}
});

// Grab a `MutexGuard` from the inner lock, which we must be able to acquire here
match self.inner.try_lock() {
Expand All @@ -161,17 +169,20 @@ impl<T: ?Sized> Mutex<T> {
}
};

// Update the vector clock stored in the Mutex with this threads clock.
// Future threads that manage to acquire have a causal dependency on this thread's failed `try_lock`.
// Future threads that fail a `try_lock` have a causal dependency on this thread's successful `try_lock`.
ExecutionState::with(|s| state.clock.update(s.get_clock(me)));

// Update this thread's clock with the clock stored in the Mutex.
// We need to do the vector clock update even in the failing case, because there's a causal
// dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2`
// that owns the lock is in its critical section, and therefore `t1` has a causal dependency
// on everything that happened before in `t2` (which is recorded in the Mutex's clock).
ExecutionState::with(|s| s.update_clock(&state.clock));
ExecutionState::with(|s| {
// Update the vector clock stored in the Mutex with this threads clock.
// Future threads that manage to acquire have a causal dependency on this thread's failed `try_lock`.
// Future threads that fail a `try_lock` have a causal dependency on this thread's successful `try_lock`.
state.clock.update(s.get_clock(me));

// Update this thread's clock with the clock stored in the Mutex.
// We need to do the vector clock update even in the failing case, because there's a causal
// dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2`
// that owns the lock is in its critical section, and therefore `t1` has a causal dependency
// on everything that happened before in `t2` (which is recorded in the Mutex's clock).
s.update_clock(&state.clock);
});

drop(state);

// We need to let other threads in here so they
Expand Down
4 changes: 2 additions & 2 deletions tests/basic/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ fn check_producer_consumer_broken1() {

#[test]
#[should_panic(expected = "nothing to get")]
fn replay_roducer_consumer_broken1() {
fn replay_producer_consumer_broken1() {
replay(
producer_consumer_broken1,
"910219ccf2ead7a59dee9e4590000282249100208904",
Expand Down Expand Up @@ -337,7 +337,7 @@ fn check_producer_consumer_broken2() {

#[test]
#[should_panic(expected = "deadlock")]
fn replay_roducer_consumer_broken2() {
fn replay_producer_consumer_broken2() {
replay(producer_consumer_broken2, "91021499a0ee829bee85922b104410200052a404")
}

Expand Down
75 changes: 43 additions & 32 deletions tests/basic/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use shuttle::scheduler::PctScheduler;
use shuttle::sync::Mutex;
use shuttle::{check, check_dfs, check_random, thread, Runner};
use shuttle::{check_dfs, check_random, thread, Runner};
use std::collections::HashSet;
use std::sync::{Arc, TryLockError};
use test_log::test;

#[test]
fn basic_lock_test() {
check(move || {
let lock = Arc::new(Mutex::new(0usize));
let lock_clone = Arc::clone(&lock);
check_dfs(
move || {
let lock = Arc::new(Mutex::new(0usize));
let lock_clone = Arc::clone(&lock);

thread::spawn(move || {
let mut counter = lock_clone.lock().unwrap();
*counter += 1;
});
thread::spawn(move || {
let mut counter = lock_clone.lock().unwrap();
*counter += 1;
});

let mut counter = lock.lock().unwrap();
*counter += 1;
});
let mut counter = lock.lock().unwrap();
*counter += 1;
},
None,
);

// TODO would be cool if we were allowed to smuggle the lock out of the run,
// TODO so we can assert invariants about it *after* execution ends
Expand All @@ -44,8 +47,7 @@ fn deadlock() {
#[test]
#[should_panic(expected = "deadlock")]
fn deadlock_default() {
// Round-robin should always fail this deadlock test
check(deadlock);
check_dfs(deadlock, None);
}

#[test]
Expand Down Expand Up @@ -147,7 +149,7 @@ fn unlock_yields() {
add_thread.join().unwrap();
mul_thread.join().unwrap();

let value = *lock.try_lock().unwrap();
let value = Arc::try_unwrap(lock).unwrap().into_inner().unwrap();
observed_values_clone.lock().unwrap().insert(value);
},
None,
Expand Down Expand Up @@ -250,7 +252,7 @@ fn concurrent_try_increment() {
thd.join().unwrap();
}

let value = *lock.try_lock().unwrap();
let value = Arc::try_unwrap(lock).unwrap().into_inner().unwrap();
observed_values_clone.lock().unwrap().insert(value);
},
None,
Expand Down Expand Up @@ -301,7 +303,7 @@ fn concurrent_lock_try_lock() {
lock_thread.join().unwrap();
try_lock_thread.join().unwrap();

let value = *lock.try_lock().unwrap();
let value = Arc::try_unwrap(lock).unwrap().into_inner().unwrap();
observed_values_clone.lock().unwrap().insert(value);
},
None,
Expand All @@ -314,36 +316,45 @@ fn concurrent_lock_try_lock() {
#[test]
#[should_panic(expected = "tried to acquire a Mutex it already holds")]
fn double_lock() {
check(|| {
let mutex = Mutex::new(());
let _guard_1 = mutex.lock().unwrap();
let _guard_2 = mutex.lock();
})
check_dfs(
|| {
let mutex = Mutex::new(());
let _guard_1 = mutex.lock().unwrap();
let _guard_2 = mutex.lock();
},
None,
)
}

#[test]
fn double_try_lock() {
check(|| {
let mutex = Mutex::new(());
let _guard_1 = mutex.try_lock().unwrap();
assert!(matches!(mutex.try_lock(), Err(TryLockError::WouldBlock)));
})
check_dfs(
|| {
let mutex = Mutex::new(());
let _guard_1 = mutex.try_lock().unwrap();
assert!(matches!(mutex.try_lock(), Err(TryLockError::WouldBlock)));
},
None,
)
}

// Check that we can safely execute the Drop handler of a Mutex without double-panicking
#[test]
#[should_panic(expected = "expected panic")]
fn panic_drop() {
check(|| {
let lock = Mutex::new(0);
let _l = lock.lock().unwrap();
panic!("expected panic");
})
check_dfs(
|| {
let lock = Mutex::new(0);
let _l = lock.lock().unwrap();
panic!("expected panic");
},
None,
)
}

#[test]
fn mutex_into_inner() {
shuttle::check_dfs(
check_dfs(
|| {
let lock = Arc::new(Mutex::new(0u64));

Expand Down

0 comments on commit 64ff2f3

Please sign in to comment.