diff --git a/Cargo.toml b/Cargo.toml index 5c4eea8..22ed1de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,12 @@ exclude = ["/.*"] [dependencies] concurrent-queue = "2" event-listener = "2.4.0" +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" } futures-core = "0.3.5" [dev-dependencies] easy-parallel = "3" futures-lite = "1" + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener" } diff --git a/src/lib.rs b/src/lib.rs index a92f38b..440e59e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; +use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; use futures_core::stream::Stream; struct Channel { @@ -240,11 +241,11 @@ impl Sender { /// # }); /// ``` pub fn send(&self, msg: T) -> Send<'_, T> { - Send { + Send::_new(SendInner { sender: self, listener: None, msg: Some(msg), - } + }) } /// Sends a message into this channel using the blocking strategy. @@ -485,7 +486,11 @@ pub struct Receiver { channel: Arc>, /// Listens for a send or close event to unblock this stream. - listener: Option, + /// + /// TODO: This is pinned and boxed because `Receiver` is `Unpin` and the newest version + /// of `event_listener::EventListener` is not. At the next major release, we can remove the + /// `Pin>` and make `Receiver` `!Unpin`. + listener: Option>>, } impl Receiver { @@ -546,10 +551,10 @@ impl Receiver { /// # }); /// ``` pub fn recv(&self) -> Recv<'_, T> { - Recv { + Recv::_new(RecvInner { receiver: self, listener: None, - } + }) } /// Receives a message from the channel using the blocking strategy. @@ -1059,20 +1064,34 @@ impl fmt::Display for TryRecvError { } } -/// A future returned by [`Sender::send()`]. +easy_wrapper! { + /// A future returned by [`Sender::send()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError>); + pub(crate) wait(); +} + #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Send<'a, T> { +struct SendInner<'a, T> { sender: &'a Sender, - listener: Option, + /// TODO: This is pinned and boxed because `Send` is `Unpin` and the newest version of + /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can + /// remove the `Pin>` and make `Send` `!Unpin`. + listener: Option>>, msg: Option, } -impl<'a, T> Send<'a, T> { +impl<'a, T> Unpin for SendInner<'a, T> {} + +impl<'a, T> EventListenerFuture for SendInner<'a, T> { + type Output = Result<(), SendError>; + /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, - cx: &mut S::Context, + fn poll_with_strategy<'x, S: Strategy<'x>>( + mut self: Pin<&'x mut Self>, + strategy: &mut S, + context: &mut S::Context, ) -> Poll>> { loop { let msg = self.msg.take().unwrap(); @@ -1084,55 +1103,50 @@ impl<'a, T> Send<'a, T> { } // Sending failed - now start listening for notifications or wait for one. - match self.listener.take() { + match self.listener.as_mut() { None => { // Start listening and then try sending again. self.listener = Some(self.sender.channel.send_ops.listen()); } Some(l) => { // Poll using the given strategy - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); + if let Poll::Pending = S::poll(strategy, l.as_mut(), context) { return Poll::Pending; + } else { + self.listener = None; } } } } } - - /// Run using the blocking strategy. - fn wait(mut self) -> Result<(), SendError> { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } } -impl<'a, T> Unpin for Send<'a, T> {} - -impl<'a, T> Future for Send<'a, T> { - type Output = Result<(), SendError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } +easy_wrapper! { + /// A future returned by [`Receiver::recv()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Recv<'a, T>(RecvInner<'a, T> => Result); + pub(crate) wait(); } -/// A future returned by [`Receiver::recv()`]. #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Recv<'a, T> { +struct RecvInner<'a, T> { receiver: &'a Receiver, - listener: Option, + /// TODO: This is pinned and boxed because `Recv` is `Unpin` and the newest version of + /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can + /// remove the `Pin>` and make `Recv` `!Unpin`. + listener: Option>>, } -impl<'a, T> Unpin for Recv<'a, T> {} +impl<'a, T> Unpin for RecvInner<'a, T> {} + +impl<'a, T> EventListenerFuture for RecvInner<'a, T> { + type Output = Result; -impl<'a, T> Recv<'a, T> { /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, + fn poll_with_strategy<'x, S: Strategy<'x>>( + mut self: Pin<&'x mut Self>, + strategy: &mut S, cx: &mut S::Context, ) -> Poll> { loop { @@ -1144,73 +1158,20 @@ impl<'a, T> Recv<'a, T> { } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.take() { + match self.listener.as_mut() { None => { // Start listening and then try receiving again. self.listener = Some(self.receiver.channel.recv_ops.listen()); } Some(l) => { // Poll using the given strategy. - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); + if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) { return Poll::Pending; + } else { + self.listener = None; } } } } } - - /// Run with the blocking strategy. - fn wait(mut self) -> Result { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } -} - -impl<'a, T> Future for Recv<'a, T> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } -} - -/// A strategy used to poll an `EventListener`. -trait Strategy { - /// Context needed to be provided to the `poll` method. - type Context; - - /// Polls the given `EventListener`. - /// - /// Returns the `EventListener` back if it was not completed; otherwise, - /// returns `Ok(())`. - fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>; -} - -/// Non-blocking strategy for use in asynchronous code. -struct NonBlocking<'a>(&'a mut ()); - -impl<'a> Strategy for NonBlocking<'a> { - type Context = Context<'a>; - - fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> { - match Pin::new(&mut evl).poll(cx) { - Poll::Ready(()) => Ok(()), - Poll::Pending => Err(evl), - } - } -} - -/// Blocking strategy for use in synchronous code. -struct Blocking; - -impl Strategy for Blocking { - type Context = (); - - fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> { - evl.wait(); - Ok(()) - } }