-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for 'abort'ing shuttle::future::JoinHandle's #150
base: main
Are you sure you want to change the base?
Conversation
/// Abort the task associated with the handle. | ||
pub fn abort(&self) { | ||
ExecutionState::try_with(|state| { | ||
if !state.is_finished() { | ||
let task = state.get_mut(self.task_id); | ||
task.detach(); | ||
task.abort(); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we also need to do something about the Future impl for JoinHandle, otherwise this test deadlocks:
#[test]
fn abort_cancelled() {
check_dfs(
|| {
let t1 = future::spawn(async { 0 });
t1.abort();
let result = future::block_on(t1);
assert!(matches!(result, Err(JoinError::Cancelled)));
},
None,
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes indeed, I'll have a look.
let mut continuation = self.continuation.borrow_mut(); | ||
continuation.wipe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth a comment about why finish()
isn't good enough — we want to run the task's destructors now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
1000, | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this doesn't work, because it's possible for the abort to happen while t2
is blocked acquiring the lock:
#[test]
fn abort_while_blocked() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(0i32));
let lock2 = lock.clone();
let t1 = thread::spawn(move || {
let _l = lock2.lock().unwrap();
});
let t2 = future::spawn(async move {
let _l = lock.lock().unwrap();
});
t2.abort();
t1.join().unwrap();
},
None,
);
}
The way it fails right now is a bit funny — when t1
drops the lock, it asserts that all the waiters are still blocked, but when the future aborts it's no longer blocked.
Not sure what the right thing to do here is. Maybe we need to check lazily whether a task has been aborted, only once a poll of the Wrapper
task returns Pending? But that might be more trouble than it's worth for what you want to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what about removing the assert!
(which is repeated in unblock
anyways), and then updating unblock
to allow state finished as well ? I need to do a pass and check that we don't ever do "silent passing" of synchro primitives if so. If we do "silent passing" then the "passer" has to check that the task it passes to isn't finished.
Actually, wait, is our implementation of MutexGuard::drop
correct? Aren't std::sync::Mutex
es fair?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Their fairness is platform-dependent IIRC.
Removing the assert might be OK, though we should add an explicit aborted flag as a sanity check that the only way a task gets unblocked is if it was aborted. But tokio's JoinHandle semantics are that the task cancels at the next await
point. I worry about aborting a task mid-synchronization operation because it leaves data structures in an intermediate state that isn't reachable in real code. Like this:
let t1 = future::spawn(async move {
let l1 = lock1.lock().unwrap();
*l1 += 1;
let l2 = lock2.lock().unwrap();
*l2 += 1;
});
t1.abort();
assert_eq!(*l1.lock().unwrap(), *l2.lock().unwrap());
In real Rust this assertion can't fail because the task body is atomic with respect to abort
(there's no await points), but in our implementation it can. (Or at least, it can deadlock? not sure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah, the immediate drop
of the Continuation
doesn't seem to be the correct solution. drop
ping immediately is correct if it has never been scheduled, else we need to do it at the next await
point. I guess lazily checking in Wrapper
is the correct solution.
Adds support for
JoinHandle::abort
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.