From 6d99af0ef40965d3a54e84a77124144326b0552c Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 6 Aug 2023 11:55:32 -0700 Subject: [PATCH] Fix test errors and make futures !Unpin This is a breaking change. However, it comes with the ability to avoid heap allocations in many cases, which is a significant boon for users for async-channel. Signed-off-by: John Nunley --- Cargo.toml | 1 + src/lib.rs | 185 +++++++++++++++++++++------------------------ tests/bounded.rs | 11 ++- tests/unbounded.rs | 3 +- 4 files changed, 98 insertions(+), 102 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22ed1de..57fff5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ concurrent-queue = "2" event-listener = "2.4.0" event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" } futures-core = "0.3.5" +pin-project-lite = "0.2.11" [dev-dependencies] easy-parallel = "3" diff --git a/src/lib.rs b/src/lib.rs index 440e59e..d3a6083 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; +use futures_core::ready; use futures_core::stream::Stream; struct Channel { @@ -129,8 +130,8 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -169,8 +170,8 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -243,8 +244,8 @@ impl Sender { pub fn send(&self, msg: T) -> Send<'_, T> { Send::_new(SendInner { sender: self, - listener: None, msg: Some(msg), + listener: EventListener::new(&self.channel.send_ops), }) } @@ -473,24 +474,34 @@ impl Clone for Sender { } } -/// The receiving side of a channel. -/// -/// Receivers can be cloned and shared among threads. When all receivers associated with a channel -/// are dropped, the channel becomes closed. -/// -/// The channel can also be closed manually by calling [`Receiver::close()`]. -/// -/// Receivers implement the [`Stream`] trait. -pub struct Receiver { - /// Inner channel state. - channel: Arc>, - - /// Listens for a send or close event to unblock this stream. +pin_project_lite::pin_project! { + /// The receiving side of a channel. + /// + /// Receivers can be cloned and shared among threads. When all receivers associated with a channel + /// are dropped, the channel becomes closed. + /// + /// The channel can also be closed manually by calling [`Receiver::close()`]. /// - /// TODO: This is pinned and boxed because `Receiver` is `Unpin` and the newest version - /// of `event_listener::EventListener` is not. At the next major release, we can remove the - /// `Pin>` and make `Receiver` `!Unpin`. - listener: Option>>, + /// Receivers implement the [`Stream`] trait. + pub struct Receiver { + // Inner channel state. + channel: Arc>, + + // Listens for a send or close event to unblock this stream. + #[pin] + listener: EventListener, + } + + impl PinnedDrop for Receiver { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + + // Decrement the receiver count and close the channel if it drops down to zero. + if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + this.channel.close(); + } + } + } } impl Receiver { @@ -553,7 +564,7 @@ impl Receiver { pub fn recv(&self) -> Recv<'_, T> { Recv::_new(RecvInner { receiver: self, - listener: None, + listener: EventListener::new(&self.channel.recv_ops), }) } @@ -755,15 +766,6 @@ impl Receiver { } } -impl Drop for Receiver { - fn drop(&mut self) { - // Decrement the receiver count and close the channel if it drops down to zero. - if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.close(); - } - } -} - impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Receiver {{ .. }}") @@ -781,7 +783,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), } } } @@ -792,9 +794,11 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - if let Some(listener) = self.listener.as_mut() { - futures_core::ready!(Pin::new(listener).poll(cx)); - self.listener = None; + { + let this = self.as_mut().project(); + if this.listener.is_listening() { + ready!(this.listener.poll(cx)); + } } loop { @@ -802,27 +806,30 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Create a listener and try sending the message again. - self.listener = Some(self.channel.stream_ops.listen()); - } - Some(_) => { - // Go back to the outer loop to poll the listener. - break; - } + let mut this = self.as_mut().project(); + if this.listener.is_listening() { + // Go back to the outer loop to wait for a notification. + break; + } else { + this.listener.as_mut().listen(); } } } @@ -907,7 +914,7 @@ impl WeakReceiver { } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), }), } } @@ -1072,50 +1079,42 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct SendInner<'a, T> { - sender: &'a Sender, - /// TODO: This is pinned and boxed because `Send` is `Unpin` and the newest version of - /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can - /// remove the `Pin>` and make `Send` `!Unpin`. - listener: Option>>, - msg: Option, +pin_project_lite::pin_project! { + #[derive(Debug)] + struct SendInner<'a, T> { + sender: &'a Sender, + msg: Option, + #[pin] + listener: EventListener, + } } -impl<'a, T> Unpin for SendInner<'a, T> {} - impl<'a, T> EventListenerFuture for SendInner<'a, T> { type Output = Result<(), SendError>; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&'x mut Self>, + self: Pin<&'x mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll>> { + let mut this = self.project(); + loop { - let msg = self.msg.take().unwrap(); + let msg = this.msg.take().unwrap(); // Attempt to send a message. - match self.sender.try_send(msg) { + match this.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => self.msg = Some(m), + Err(TrySendError::Full(m)) => *this.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Start listening and then try sending again. - self.listener = Some(self.sender.channel.send_ops.listen()); - } - Some(l) => { - // Poll using the given strategy - if let Poll::Pending = S::poll(strategy, l.as_mut(), context) { - return Poll::Pending; - } else { - self.listener = None; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), context)); + } else { + this.listener.as_mut().listen(); } } } @@ -1129,48 +1128,40 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct RecvInner<'a, T> { - receiver: &'a Receiver, - /// TODO: This is pinned and boxed because `Recv` is `Unpin` and the newest version of - /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can - /// remove the `Pin>` and make `Recv` `!Unpin`. - listener: Option>>, +pin_project_lite::pin_project! { + #[derive(Debug)] + struct RecvInner<'a, T> { + receiver: &'a Receiver, + #[pin] + listener: EventListener, + } } -impl<'a, T> Unpin for RecvInner<'a, T> {} - impl<'a, T> EventListenerFuture for RecvInner<'a, T> { type Output = Result; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&'x mut Self>, + self: Pin<&'x mut Self>, strategy: &mut S, cx: &mut S::Context, ) -> Poll> { + let mut this = self.project(); + loop { // Attempt to receive a message. - match self.receiver.try_recv() { + match this.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Start listening and then try receiving again. - self.listener = Some(self.receiver.channel.recv_ops.listen()); - } - Some(l) => { - // Poll using the given strategy. - if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) { - return Poll::Pending; - } else { - self.listener = None; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), cx)); + } else { + this.listener.as_mut().listen(); } } } diff --git a/tests/bounded.rs b/tests/bounded.rs index 0ae4890..abb8895 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -332,9 +332,10 @@ fn forget_blocked_sender() { .add(move || { assert!(future::block_on(s1.send(3)).is_ok()); assert!(future::block_on(s1.send(7)).is_ok()); - let mut s1_fut = s1.send(13); + let s1_fut = s1.send(13); + futures_lite::pin!(s1_fut); // Poll but keep the future alive. - assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None); + assert_eq!(future::block_on(future::poll_once(s1_fut)), None); sleep(ms(500)); }) .add(move || { @@ -358,8 +359,9 @@ fn forget_blocked_receiver() { Parallel::new() .add(move || { - let mut r1_fut = r1.recv(); + let r1_fut = r1.recv(); // Poll but keep the future alive. + futures_lite::pin!(r1_fut); assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None); sleep(ms(500)); }) @@ -436,8 +438,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r; + let r = r; move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst); diff --git a/tests/unbounded.rs b/tests/unbounded.rs index e239d34..31d4987 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -295,8 +295,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r.clone(); + let r = r.clone(); move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst);