From 4d2ad0e9ceeca738384713eb38268a75741665ef Mon Sep 17 00:00:00 2001 From: Stiopa Koltsov Date: Mon, 1 Mar 2021 02:18:42 +0000 Subject: [PATCH] Allow calling UnboundedReceiver::try_next after None Allow calling `UnboundedReceiver::try_next` and `Receiver::try_next` after `None`: do not panic. Not-panicking is equally safe, and does not have negative performance implication. It is irrelevant for `Stream` implementation to panic or not (because `Stream` behavior is unspecified after `None`), but panicking in `try_next` just complicates the interface: returned `Ok(None)` is reasonable assumption to have. Consider this use case: drain the queue on drop by performing app-specific cleanup of queued messages. The obvious implementation would be: ``` impl Drop for MyReceiverWrapper { fn drop(&mut self) { while let Ok(Some(m)) self.try_next() { cleanup(m); } } } ``` Without this change, I cannot even say for sure how this code need to be implemented to avoid panicking. E. g. is `is_closed` enough or some additional checks need to be performed? --- futures-channel/src/mpsc/mod.rs | 16 ++++++++-------- futures-channel/tests/mpsc-close.rs | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 854ba34935..7442569eb3 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -1020,9 +1020,6 @@ impl Receiver { /// It is not recommended to call this function from inside of a future, /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. - /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { Poll::Ready(msg) => { @@ -1033,7 +1030,10 @@ impl Receiver { } fn next_message(&mut self) -> Poll> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1173,9 +1173,6 @@ impl UnboundedReceiver { /// * `Ok(Some(t))` when message is fetched /// * `Ok(None)` when channel is closed and no messages left in the queue /// * `Err(e)` when there are no messages available, but channel is not yet closed - /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { Poll::Ready(msg) => { @@ -1186,7 +1183,10 @@ impl UnboundedReceiver { } fn next_message(&mut self) -> Poll> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { diff --git a/futures-channel/tests/mpsc-close.rs b/futures-channel/tests/mpsc-close.rs index 9eb5296d88..0df78f7854 100644 --- a/futures-channel/tests/mpsc-close.rs +++ b/futures-channel/tests/mpsc-close.rs @@ -276,3 +276,25 @@ fn stress_try_send_as_receiver_closes() { bg.join() .expect("background thread join"); } + +#[test] +fn unbounded_try_next_after_none() { + let (tx, mut rx) = mpsc::unbounded::(); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} + +#[test] +fn bounded_try_next_after_none() { + let (tx, mut rx) = mpsc::channel::(17); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +}