Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RwLock::{try_read, try_write} #72

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,20 @@ impl TaskSet {
self.tasks.iter().all(|b| !*b)
}

pub fn insert(&mut self, tid: TaskId) {
/// Add a task to the set. If the set did not have this value present, `true` is returned. If
/// the set did have this value present, `false` is returned.
pub fn insert(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
self.tasks.resize(1 + tid.0, false);
}
*self.tasks.get_mut(tid.0).unwrap() = true;
!std::mem::replace(&mut *self.tasks.get_mut(tid.0).unwrap(), true)
}

/// Removes a value from the set. Returns whether the value was present in the set.
pub fn remove(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
return false;
}
std::mem::replace(&mut self.tasks.get_mut(tid.0).unwrap(), false)
}

Expand Down
154 changes: 140 additions & 14 deletions src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
use tracing::trace;

/// A reader-writer lock, the same as [`std::sync::RwLock`].
///
/// Unlike [`std::sync::RwLock`], the same thread is never allowed to acquire the read side of a
/// `RwLock` more than once. The `std` version is ambiguous about what behavior is allowed here, so
/// we choose the most conservative one.
pub struct RwLock<T: ?Sized> {
state: Rc<RefCell<RwLockState>>,
inner: std::sync::RwLock<T>,
Expand Down Expand Up @@ -99,16 +103,51 @@ impl<T: ?Sized> RwLock<T> {
///
/// If the access could not be granted at this time, then Err is returned. This function does
/// not block.
///
/// Note that unlike [`std::sync::RwLock::try_read`], if the current thread already holds this
/// read lock, `try_read` will return Err.
pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<T>> {
unimplemented!()
if self.try_lock(RwLockType::Read) {
match self.inner.try_read() {
Ok(guard) => Ok(RwLockReadGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockReadGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}

/// Attempts to acquire this rwlock with shared read access.
///
/// If the access could not be granted at this time, then Err is returned. This function does
/// not block.
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
unimplemented!()
if self.try_lock(RwLockType::Write) {
match self.inner.try_write() {
Ok(guard) => Ok(RwLockWriteGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockWriteGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}

/// Consumes this `RwLock`, returning the underlying data
Expand All @@ -125,6 +164,7 @@ impl<T: ?Sized> RwLock<T> {
self.inner.into_inner()
}

/// Acquire the lock in the provided mode, blocking this thread until it succeeds.
fn lock(&self, typ: RwLockType) {
let me = ExecutionState::me();

Expand All @@ -133,7 +173,7 @@ impl<T: ?Sized> RwLock<T> {
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"waiting to acquire {:?} lock on rwlock {:p}",
"acquiring {:?} lock on rwlock {:p}",
typ,
self.state,
);
Expand All @@ -144,28 +184,37 @@ impl<T: ?Sized> RwLock<T> {
} else {
state.waiting_readers.insert(me);
}
// Block if the lock is in a state where we can't acquire it immediately
match &state.holder {
// Block if the lock is in a state where we can't acquire it immediately. Note that we only
// need to context switch here if we can't acquire the lock. If it's available for us to
// acquire, but there is also another thread `t` that wants to acquire it, then `t` must
// have been runnable when this thread was chosen to execute and could have been chosen
// instead.
let should_switch = match &state.holder {
RwLockHolder::Write(writer) => {
if *writer == me {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
ExecutionState::with(|s| s.current_mut().block());
true
}
RwLockHolder::Read(readers) => {
if readers.contains(me) {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
if typ == RwLockType::Write {
ExecutionState::with(|s| s.current_mut().block());
true
} else {
false
}
}
_ => {}
}
RwLockHolder::None => false,
};
drop(state);

// Acquiring a lock is a yield point
thread::switch();
if should_switch {
thread::switch();
}

let mut state = self.state.borrow_mut();
// Once the scheduler has resumed this thread, we are clear to take the lock. We might
Expand Down Expand Up @@ -203,14 +252,91 @@ impl<T: ?Sized> RwLock<T> {
typ,
self.state
);
// Update acquiring thread's clock with the clock stored in the RwLock
ExecutionState::with(|s| s.update_clock(&state.clock));

// Increment the current thread's clock and update this RwLock's clock to match.
// TODO we can likely do better here: there is no causality between multiple readers holding
// the lock at the same time.
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
});

// Block all other waiters, since we won the race to take this lock
// TODO a bit of a bummer that we have to do this (it would be cleaner if those threads
// TODO never become unblocked), but might need to track more state to avoid this.
Self::block_waiters(&*state, me, typ);
drop(state);

// We need to let other threads in here so they may fail a `try_read` or `try_write`. This
// is the case because the current thread holding the lock might not have any further
// context switches until after releasing the lock.
thread::switch();
}

/// Attempt to acquire this lock in the provided mode, but without blocking. Returns `true` if
/// the lock was able to be acquired without blocking, or `false` otherwise.
fn try_lock(&self, typ: RwLockType) -> bool {
jamesbornholt marked this conversation as resolved.
Show resolved Hide resolved
let me = ExecutionState::me();

let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"trying to acquire {:?} lock on rwlock {:p}",
typ,
self.state,
);

let acquired = match (typ, &mut state.holder) {
(RwLockType::Write, RwLockHolder::None) => {
state.holder = RwLockHolder::Write(me);
true
}
(RwLockType::Read, RwLockHolder::None) => {
let mut readers = TaskSet::new();
readers.insert(me);
state.holder = RwLockHolder::Read(readers);
true
}
(RwLockType::Read, RwLockHolder::Read(readers)) => {
// If we already hold the read lock, `insert` returns false, which will cause this
// acquisition to fail with `WouldBlock` so we can diagnose potential deadlocks.
readers.insert(me)
}
_ => false,
};

trace!(
"{} {:?} lock on rwlock {:p}",
if acquired { "acquired" } else { "failed to acquire" },
typ,
self.state,
);

// Update this thread's clock with the clock stored in the RwLock.
// We need to do the vector clock update even in the failing case, because there's a causal
// dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2`
// that owns the lock is not in the right state to be read/written, and therefore `t1` has a
// causal dependency on everything that happened before in `t2` (which is recorded in the
// RwLock's clock).
// TODO we can likely do better here: there is no causality between successful `try_read`s
// and other concurrent readers, and there's no need to update the clock on failed
// `try_read`s.
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
jamesbornholt marked this conversation as resolved.
Show resolved Hide resolved
});

// Block all other waiters, since we won the race to take this lock
Self::block_waiters(&*state, me, typ);
drop(state);

// We need to let other threads in here so they
// (a) may fail a `try_lock` (in case we acquired), or
// (b) may release the lock (in case we failed to acquire) so we can succeed in a subsequent
// `try_lock`.
thread::switch();

acquired
}

fn block_waiters(state: &RwLockState, me: TaskId, typ: RwLockType) {
Expand Down Expand Up @@ -324,7 +450,7 @@ impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
state.holder = RwLockHolder::None;
}
}
_ => panic!("exiting a reader but rwlock is in the wrong state"),
_ => panic!("exiting a reader but rwlock is in the wrong state {:?}", state.holder),
}

if ExecutionState::should_stop() {
Expand Down
63 changes: 36 additions & 27 deletions tests/basic/clocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use shuttle::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use shuttle::sync::mpsc::{channel, sync_channel};
use shuttle::sync::{Barrier, Condvar, Mutex, Once, RwLock};
use shuttle::{check_dfs, check_pct, current, thread};
use shuttle::{check_dfs, check_pct, check_random, current, thread};
use std::collections::HashSet;
use std::sync::Arc;
use test_log::test;
Expand All @@ -10,7 +10,7 @@ pub fn me() -> usize {
usize::from(thread::current().id())
}

// TODO Maybe make this a macro so backtraces are more informative
#[track_caller]
pub fn check_clock(f: impl Fn(usize, u32) -> bool) {
for (i, &c) in current::clock().iter().enumerate() {
assert!(
Expand Down Expand Up @@ -76,32 +76,34 @@ fn clock_mutex_pct() {

// RWLocks
fn clock_rwlock(num_writers: usize, num_readers: usize) {
// This test checks that when a thread acquires a RwLock, it inherits the clocks
// of any writers that accessed the lock before it, but not the clocks from any readers.
// This test checks that when a thread acquires a RwLock, it inherits the clocks of writers that
// accessed the lock before it. It's the same as `clock_mutex`, except that readers don't update
// the set S, and aren't required to appear in the clock for future lock holders.
//
// Test: create a rwlock-protected set, initialized with 0 (the id of the main thread)
// and spawn some writers and readers. Each thread does the following:
// (1) check that its own initial vector clock only has nonzero for the main thread (thread 0)
// (2w) [for writers only] acquire a write lock on the set and add its own thread id to it
// (2r) [for readers only] acquire a read lock on the set
// (3) read its own clock again, call this C
// (4) check that the only nonzero entries in C are for the threads in S and the current thread (for readers)
//
// Note: no dummy thread here since we're already checking that readers' clock entries are always zero
let mut set = HashSet::new();
set.insert(0);
let set = Arc::new(RwLock::new(set));
// TODO this test is pretty weak. Testing readers is hard because they race with each other; for
// example, a reader might see the clock update from another reader before that reader has a
// chance to update the set S. Causality is also pretty fuzzy for readers (see the TODOs in the
// RwLock implementation). So we don't test very much about them here.
let set = Arc::new(std::sync::Mutex::new(HashSet::from([0])));
let lock = Arc::new(RwLock::new(()));

// Create dummy thread (should have id 1)
thread::spawn(|| {
assert_eq!(me(), 1usize);
});

// Spawn the writers
let _thds = (0..num_writers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let mut set = set.write().unwrap();
let _guard = lock.write().unwrap();
let mut set = set.lock().unwrap();
set.insert(me());
// Check that the only nonzero clock entries are for the threads in the set
check_clock(|i, c| (c > 0) == set.contains(&i));
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why you weakened this check from an iff to an implication?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to handle race condition described at the top of the test—readers can have a non-zero clock without being in the set.

})
})
.collect::<Vec<_>>();
Expand All @@ -110,26 +112,33 @@ fn clock_rwlock(num_writers: usize, num_readers: usize) {
let _thds = (0..num_readers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let set = set.read().unwrap();
// Check that the only nonzero clock entries are for threads in the set and the current thread
check_clock(|i, c| (c > 0) == (i == me() || set.contains(&i)));
let _guard = lock.read().unwrap();
let set = set.lock().unwrap();
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
})
})
.collect::<Vec<_>>();
}

#[test]
fn clock_rwlock_dfs() {
// TODO 2 writers + 2 readers takes too long right now; once we reduce context switching, it should be feasible
check_dfs(|| clock_rwlock(2, 1), None);
check_dfs(|| clock_rwlock(1, 2), None);
// Unfortunately anything larger than this takes > 500k iterations, too slow to be useful :(
// But the PCT and random tests below buy us a much bigger search.
check_dfs(|| clock_rwlock(1, 1), None);
}

#[test]
fn clock_rwlock_pct() {
check_pct(|| clock_rwlock(10, 20), 10_000, 3);
check_pct(|| clock_rwlock(4, 4), 10_000, 3);
}

#[test]
fn clock_rwlock_random() {
check_random(|| clock_rwlock(4, 4), 10_000);
}

// Barrier
Expand Down Expand Up @@ -336,7 +345,7 @@ fn clock_mpsc_bounded() {
// The sender has sent a message, so its clock is nonzero
let c1 = current::clock().get(1);
assert!(c1 > 0);
let _ = rx.recv().unwrap();
rx.recv().unwrap();
Comment on lines -339 to +348
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Clippy 1.62 thing.

// The sender has sent another message, so its clock has increased
assert!(current::clock().get(2) > c1);
// Receive the remaining messages
Expand Down
Loading