Skip to content

Commit

Permalink
sync: fix mpsc bug related to closing the channel (#3215)
Browse files Browse the repository at this point in the history
When closing a channel, it is possible to get into an invalid state when
outstanding permits release capacity back to the channel.
  • Loading branch information
carllerche authored Dec 7, 2020
1 parent c63057e commit 3d17488
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
4 changes: 4 additions & 0 deletions tokio/src/sync/semaphore_ll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,10 @@ impl Waiter {
let mut curr = WaiterState(self.state.load(Acquire));

loop {
if curr.is_closed() {
return 0;
}

if !curr.is_queued() {
assert_eq!(0, curr.permits_to_acquire());
}
Expand Down
24 changes: 24 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,27 @@ fn ready_close_cancel_bounded() {

assert!(recv.is_woken());
}

#[tokio::test]
async fn permit_available_not_acquired_close() {
use futures::future::poll_fn;

let (mut tx1, mut rx) = mpsc::channel::<()>(1);
let mut tx2 = tx1.clone();

{
let mut ready = task::spawn(poll_fn(|cx| tx1.poll_ready(cx)));
assert_ready_ok!(ready.poll());
}

let mut ready = task::spawn(poll_fn(|cx| tx2.poll_ready(cx)));
assert_pending!(ready.poll());

rx.close();

drop(tx1);
assert!(ready.is_woken());

drop(tx2);
assert!(rx.recv().await.is_none());
}

0 comments on commit 3d17488

Please sign in to comment.