Skip to content

Commit

Permalink
Allow calling UnboundedReceiver::try_next after None
Browse files Browse the repository at this point in the history
Allow calling `UnboundedReceiver::try_next` and `Receiver::try_next`
after `None`: do not panic.

Not-panicking is equally safe, and does not have negative performance
implication.

It is irrelevant for `Stream` implementation to panic or not (because
`Stream` behavior is unspecified after `None`), but panicking in
`try_next` just complicates the interface: returned `Ok(None)` is
reasonable assumption to have.

Consider this use case: drain the queue on drop by performing
app-specific cleanup of queued messages.

The obvious implementation would be:

```
impl Drop for MyReceiverWrapper {
    fn drop(&mut self) {
        while let Ok(Some(m)) self.try_next() {
            cleanup(m);
        }
    }
}
```

Without this change, I cannot even say for sure how this code need
to be implemented to avoid panicking. E. g. is `is_closed` enough
or some additional checks need to be performed?
  • Loading branch information
stepancheg authored and taiki-e committed Mar 1, 2021
1 parent 48d0096 commit 4d2ad0e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
16 changes: 8 additions & 8 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,9 +1020,6 @@ impl<T> Receiver<T> {
/// It is not recommended to call this function from inside of a future,
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1033,7 +1030,10 @@ impl<T> Receiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down Expand Up @@ -1173,9 +1173,6 @@ impl<T> UnboundedReceiver<T> {
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1186,7 +1183,10 @@ impl<T> UnboundedReceiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down
22 changes: 22 additions & 0 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,25 @@ fn stress_try_send_as_receiver_closes() {
bg.join()
.expect("background thread join");
}

#[test]
fn unbounded_try_next_after_none() {
let (tx, mut rx) = mpsc::unbounded::<String>();
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}

#[test]
fn bounded_try_next_after_none() {
let (tx, mut rx) = mpsc::channel::<String>(17);
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}

0 comments on commit 4d2ad0e

Please sign in to comment.