Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for 'abort'ing shuttle::future::JoinHandle's #150

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ impl<T> Default for JoinHandleInner<T> {
}

impl<T> JoinHandle<T> {
/// Detach the task associated with the handle.
fn detach(&self) {
ExecutionState::try_with(|state| {
if !state.is_finished() {
let task = state.get_mut(self.task_id);
task.detach();
}
});
}

/// Abort the task associated with the handle.
pub fn abort(&self) {
ExecutionState::try_with(|state| {
if !state.is_finished() {
let task = state.get_mut(self.task_id);
task.detach();
task.abort();
}
});
}
Comment on lines 67 to 75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we also need to do something about the Future impl for JoinHandle, otherwise this test deadlocks:

#[test]
fn abort_cancelled() {
    check_dfs(
        || {
            let t1 = future::spawn(async { 0 });
            t1.abort();
            let result = future::block_on(t1);
            assert!(matches!(result, Err(JoinError::Cancelled)));
        },
        None,
    );
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes indeed, I'll have a look.

Expand Down Expand Up @@ -96,7 +106,7 @@ impl Error for JoinError {}

impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
self.abort();
self.detach();
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ impl Task {
self.detached = true;
}

pub(crate) fn abort(&mut self) {
self.finish();
let mut continuation = self.continuation.borrow_mut();
continuation.wipe();
Comment on lines +348 to +349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment about why finish() isn't good enough — we want to run the task's destructors now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}

pub(crate) fn waker(&self) -> Waker {
self.waker.clone()
}
Expand Down Expand Up @@ -373,8 +379,10 @@ impl Task {
self.park_state.blocked_in_park = false;
}

// TODO: Investigate whether we should move `wipe` here. (I think the correct scheme is to have it
// toggleable by the instantiator of the `Task` — those modelling async `JoinHandle`s should
// clean eagerly, thos modelling sync `JoinHandle`s should not.)
pub(crate) fn finish(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Finished;
}

Expand Down
15 changes: 11 additions & 4 deletions src/runtime/thread/continuation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,19 @@ pub(crate) struct PooledContinuation {
queue: Rc<RefCell<VecDeque<Continuation>>>,
}

impl PooledContinuation {
pub fn wipe(&mut self) {
if let Some(c) = self.continuation.take() {
if c.reusable() {
self.queue.borrow_mut().push_back(c);
}
}
}
}

impl Drop for PooledContinuation {
fn drop(&mut self) {
let c = self.continuation.take().unwrap();
if c.reusable() {
self.queue.borrow_mut().push_back(c);
}
self.wipe()
}
}

Expand Down
72 changes: 72 additions & 0 deletions tests/future/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,78 @@ fn async_counter() {
});
}

// We need a way to hold the `MutexGuard`, which is `!Send`, across an `await`.
struct WrappedMutexGuard<'a> {
#[allow(unused)]
guard: shuttle::sync::MutexGuard<'a, ()>,
}

unsafe impl<'a> Send for WrappedMutexGuard<'a> {}

async fn acquire_and_loop(mutex: Arc<Mutex<()>>) {
let _g = WrappedMutexGuard {
guard: mutex.lock().unwrap(),
};
loop {
future::yield_now().await;
}
}

// The idea is to acquire a mutex, abort the JoinHandle, then acquire the Mutex.
// This should succeed, because `JoinHandle::abort()` should free the Mutex.
#[test]
fn abort_frees_mutex() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

jh.abort(); // this unblocks

let _g = mutex.lock();
},
1000,
);
}

// The idea is to acquire a mutex, drop the JoinHandle, then acquire the Mutex.
// This should fail, because `drop`ping the JoinHandle just detaches it, meaning
// it keeps holding the Mutex.
#[test]
#[should_panic(expected = "exceeded max_steps bound")]
fn drop_join_handle_deadlocks() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

drop(jh);

let _g = mutex.lock();
},
1000,
);
}

// The idea is to acquire a mutex, forget the JoinHandle, then acquire the Mutex.
// This should fail, because `forget`ting the JoinHandle doesn't cause it to release
// the Mutex.
#[test]
#[should_panic(expected = "exceeded max_steps bound")]
fn forget_join_handle_deadlocks() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

std::mem::forget(jh);

let _g = mutex.lock();
},
1000,
);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this doesn't work, because it's possible for the abort to happen while t2 is blocked acquiring the lock:

#[test]
fn abort_while_blocked() {
    check_dfs(
        || {
            let lock = Arc::new(Mutex::new(0i32));
            let lock2 = lock.clone();
            let t1 = thread::spawn(move || {
                let _l = lock2.lock().unwrap();
            });
            let t2 = future::spawn(async move {
                let _l = lock.lock().unwrap();
            });
            t2.abort();
            t1.join().unwrap();
        },
        None,
    );
}

The way it fails right now is a bit funny — when t1 drops the lock, it asserts that all the waiters are still blocked, but when the future aborts it's no longer blocked.

Not sure what the right thing to do here is. Maybe we need to check lazily whether a task has been aborted, only once a poll of the Wrapper task returns Pending? But that might be more trouble than it's worth for what you want to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what about removing the assert! (which is repeated in unblock anyways), and then updating unblock to allow state finished as well ? I need to do a pass and check that we don't ever do "silent passing" of synchro primitives if so. If we do "silent passing" then the "passer" has to check that the task it passes to isn't finished.


Actually, wait, is our implementation of MutexGuard::drop correct? Aren't std::sync::Mutexes fair?

Copy link
Member

@jamesbornholt jamesbornholt Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Their fairness is platform-dependent IIRC.

Removing the assert might be OK, though we should add an explicit aborted flag as a sanity check that the only way a task gets unblocked is if it was aborted. But tokio's JoinHandle semantics are that the task cancels at the next await point. I worry about aborting a task mid-synchronization operation because it leaves data structures in an intermediate state that isn't reachable in real code. Like this:

let t1 = future::spawn(async move {
    let l1 = lock1.lock().unwrap();
    *l1 += 1;
    let l2 = lock2.lock().unwrap();
    *l2 += 1;
});
t1.abort();
assert_eq!(*l1.lock().unwrap(), *l2.lock().unwrap());

In real Rust this assertion can't fail because the task body is atomic with respect to abort (there's no await points), but in our implementation it can. (Or at least, it can deadlock? not sure).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, the immediate drop of the Continuation doesn't seem to be the correct solution. dropping immediately is correct if it has never been scheduled, else we need to do it at the next await point. I guess lazily checking in Wrapper is the correct solution.

#[test]
fn async_counter_random() {
check_random(async_counter, 5000)
Expand Down
Loading