diff --git a/futures-channel/benches/sync_mpsc.rs b/futures-channel/benches/sync_mpsc.rs index 2296d88a20..89c9140e63 100755 --- a/futures-channel/benches/sync_mpsc.rs +++ b/futures-channel/benches/sync_mpsc.rs @@ -114,7 +114,6 @@ impl Stream for TestSender { try_ready!(self.tx.poll_ready(cx).map_err(|_| ())); self.tx.start_send(self.last + 1).unwrap(); self.last += 1; - assert!(self.tx.poll_flush(cx).unwrap().is_ready()); Ok(Async::Ready(Some(self.last))) } } diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 06dd9bbb78..b96eba69e4 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -5,18 +5,19 @@ //! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements //! [`Stream`](futures_core::Stream) and allows a task to read values out of the //! channel. If there is no message to read from the channel, the current task -//! will be notified when a new value is sent. [`Sender`](Sender) implements the +//! will be awoken when a new value is sent. [`Sender`](Sender) implements the //! `Sink` trait and allows a task to send messages into //! the channel. If the channel is at capacity, the send will be rejected and -//! the task will be notified when additional capacity is available. In other -//! words, the channel provides backpressure. +//! the task will be awoken when additional capacity is available. This process +//! of delaying sends beyond a certain capacity is often referred to as +//! "backpressure". //! -//! Unbounded channels are also available using the [`unbounded`](unbounded) -//! constructor. +//! Unbounded channels (without backpressure) are also available using +//! the [`unbounded`](unbounded) function. //! //! # Disconnection //! -//! When all [`Sender`](Sender) handles have been dropped, it is no longer +//! When all [`Sender`](Sender)s have been dropped, it is no longer //! possible to send values into the channel. This is considered the termination //! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next) //! will return `Ok(Ready(None))`. @@ -37,36 +38,37 @@ // At the core, the channel uses an atomic FIFO queue for message passing. This // queue is used as the primary coordination primitive. In order to enforce // capacity limits and handle back pressure, a secondary FIFO queue is used to -// send parked task handles. +// send wakers for blocked `Sender` tasks. // // The general idea is that the channel is created with a `buffer` size of `n`. // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" // slot to hold a message. This allows `Sender` to know for a fact that a send -// will succeed *before* starting to do the actual work of sending the value. -// Since most of this work is lock-free, once the work starts, it is impossible -// to safely revert. +// can be successfully started *before* beginning to do the actual work of +// sending the value. However, a `send` will not complete until the number of +// messages in the channel has dropped back down below the configured buffer +// size. // -// If the sender is unable to process a send operation, then the current -// task is parked and the handle is sent on the parked task queue. -// -// Note that the implementation guarantees that the channel capacity will never -// exceed the configured limit, however there is no *strict* guarantee that the -// receiver will wake up a parked task *immediately* when a slot becomes -// available. However, it will almost always unpark a task when a slot becomes -// available and it is *guaranteed* that a sender will be unparked when the -// message that caused the sender to become parked is read out of the channel. +// Note that the implementation guarantees that the number of items that have +// finished sending into a channel without being received will not exceed the +// configured buffer size. However, there is no *strict* guarantee that the +// receiver will wake up a blocked `Sender` *immediately* when the buffer size +// drops below the configured limit. However, it will almost always awaken a +// `Sender` when buffer space becomes available, and it is *guaranteed* that a +// `Sender` will be awoken by the time its most recently-sent message is +// popped out of the channel by the `Receiver`. // // The steps for sending a message are roughly: // // 1) Increment the channel message count -// 2) If the channel is at capacity, push the task handle onto the wait queue -// 3) Push the message onto the message queue. +// 2) If the channel is at capacity, push the task's waker onto the wait queue +// 3) Push the message onto the message queue +// 4) If a wakeup was queued, wait for it to occur // // The steps for receiving a message are roughly: // // 1) Pop a message from the message queue -// 2) Pop a task handle from the wait queue -// 3) Decrement the channel message count. +// 2) Pop a task waker from the wait queue +// 3) Decrement the channel message count // // It's important for the order of operations on lock-free structures to happen // in reverse order between the sender and receiver. This makes the message @@ -77,13 +79,13 @@ use std::fmt; use std::error::Error; use std::any::Any; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex}; use std::thread; use std::usize; -use futures_core::task::{self, Waker}; +use futures_core::task::{self, AtomicWaker, Waker}; use futures_core::{Async, Poll, Stream}; use futures_core::never::Never; @@ -102,22 +104,13 @@ pub struct Sender { // Handle to the task that is blocked on this sender. This handle is sent // to the receiver half in order to be notified when the sender becomes // unblocked. - sender_task: Arc>, + sender_waker: Arc>, // True if the sender might be blocked. This is an optimization to avoid // having to lock the mutex most of the time. - maybe_parked: bool, + maybe_blocked: bool, } -/// The transmission end of an unbounded mpsc channel. -/// -/// This value is created by the [`unbounded`](unbounded) function. -#[derive(Debug)] -pub struct UnboundedSender(Sender); - -trait AssertKinds: Send + Sync + Clone {} -impl AssertKinds for UnboundedSender {} - /// The receiving end of a bounded mpsc channel. /// /// This value is created by the [`channel`](channel) function. @@ -126,12 +119,6 @@ pub struct Receiver { inner: Arc>, } -/// The receiving end of an unbounded mpsc channel. -/// -/// This value is created by the [`unbounded`](unbounded) function. -#[derive(Debug)] -pub struct UnboundedReceiver(Receiver); - /// The error type for [`Sender`s](Sender) used as `Sink`s. #[derive(Clone, Debug, PartialEq, Eq)] pub struct SendError { @@ -192,6 +179,10 @@ impl SendError { _ => false, } } + + fn disconnected() -> Self { + SendError { kind: SendErrorKind::Disconnected } + } } impl fmt::Debug for TrySendError { @@ -276,13 +267,13 @@ struct Inner { message_queue: Queue>, // Atomic, FIFO queue used to send parked task handles to the receiver. - parked_queue: Queue>>, + parked_queue: Queue>>, // Number of senders in existence num_senders: AtomicUsize, - // Handle to the receiver's task. - recv_task: Mutex, + // Waker for the receiver's task. + recv_waker: AtomicWaker, } // Struct representation of `Inner::state`. @@ -295,19 +286,6 @@ struct State { num_messages: usize, } -#[derive(Debug)] -struct ReceiverTask { - unparked: bool, - task: Option, -} - -// Returned from Receiver::try_park() -enum TryPark { - Parked, - Closed, - NotEmpty, -} - // The `is_open` flag is stored in the left-most bit of `Inner::state` const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); @@ -324,24 +302,24 @@ const MAX_BUFFER: usize = MAX_CAPACITY >> 1; // Sent to the consumer to wake up blocked producers #[derive(Debug)] -struct SenderTask { - task: Option, - is_parked: bool, +struct SenderWaker { + waker: Option, + is_blocked: bool, } -impl SenderTask { +impl SenderWaker { fn new() -> Self { - SenderTask { - task: None, - is_parked: false, + SenderWaker { + waker: None, + is_blocked: false, } } - fn notify(&mut self) { - self.is_parked = false; + fn wake(&mut self) { + self.is_blocked = false; - if let Some(task) = self.task.take() { - task.wake(); + if let Some(waker) = self.waker.take() { + waker.wake(); } } } @@ -364,20 +342,6 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { channel2(Some(buffer)) } -/// Creates an unbounded mpsc channel for communicating between asynchronous tasks. -/// -/// A `send` on this channel will always succeed as long as the receive half has -/// not been closed. If the receiver falls behind, messages will be arbitrarily -/// buffered. -/// -/// **Note** that the amount of available system memory is an implicit bound to -/// the channel. Using an `unbounded` channel has the ability of causing the -/// process to run out of memory. In this case, the process will be aborted. -pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let (tx, rx) = channel2(None); - (UnboundedSender(tx), UnboundedReceiver(rx)) -} - fn channel2(buffer: Option) -> (Sender, Receiver) { let inner = Arc::new(Inner { buffer: buffer, @@ -385,16 +349,13 @@ fn channel2(buffer: Option) -> (Sender, Receiver) { message_queue: Queue::new(), parked_queue: Queue::new(), num_senders: AtomicUsize::new(1), - recv_task: Mutex::new(ReceiverTask { - unparked: false, - task: None, - }), + recv_waker: AtomicWaker::new(), }); let tx = Sender { inner: inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, + sender_waker: Arc::new(Mutex::new(SenderWaker::new())), + maybe_blocked: false, }; let rx = Receiver { @@ -475,7 +436,7 @@ impl Sender { self.park(cx); } - self.queue_push_and_signal(Some(msg)); + self.push_msg_and_wake_receiver(Some(msg)); Ok(()) } @@ -506,31 +467,18 @@ impl Sender { }, }; - self.queue_push_and_signal(msg); + self.push_msg_and_wake_receiver(msg); Ok(()) } - fn poll_ready_nb(&self) -> Poll<(), SendError> { - let state = decode_state(self.inner.state.load(SeqCst)); - if state.is_open { - Ok(Async::Ready(())) - } else { - Err(SendError { - kind: SendErrorKind::Full, - }) - } - } - - // Push message to the queue and signal to the receiver - fn queue_push_and_signal(&self, msg: Option) { + fn push_msg_and_wake_receiver(&self, msg: Option) { // Push the message onto the message queue self.inner.message_queue.push(msg); - // Signal to the receiver that a message has been enqueued. If the - // receiver is parked, this will unpark the task. - self.signal(); + // Awaken the reciever task if it was blocked. + self.inner.recv_waker.wake(); } // Increment the number of queued messages. Returns if the sender should @@ -576,54 +524,24 @@ impl Sender { } } - // Signal to the receiver task that a message has been enqueued - fn signal(&self) { - // TODO - // This logic can probably be improved by guarding the lock with an - // atomic. - // - // Do this step first so that the lock is dropped when - // `unpark` is called - let task = { - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - // If the receiver has already been unparked, then there is nothing - // more to do - if recv_task.unparked { - return; - } - - // Setting this flag enables the receiving end to detect that - // an unpark event happened in order to avoid unnecessarily - // parking. - recv_task.unparked = true; - recv_task.task.take() - }; - - if let Some(task) = task { - task.wake(); - } - } - fn park(&mut self, cx: Option<&mut task::Context>) { // TODO: clean up internal state if the task::current will fail - let task = cx.map(|cx| cx.waker().clone()); + let waker = cx.map(|cx| cx.waker().clone()); { - let mut sender = self.sender_task.lock().unwrap(); - sender.task = task; - sender.is_parked = true; + let mut sender = self.sender_waker.lock().unwrap(); + sender.waker = waker; + sender.is_blocked = true; } // Send handle over queue - let t = self.sender_task.clone(); + let t = self.sender_waker.clone(); self.inner.parked_queue.push(t); // Check to make sure we weren't closed after we sent our task on the // queue - let state = decode_state(self.inner.state.load(SeqCst)); - self.maybe_parked = state.is_open; + self.maybe_blocked = !self.is_closed(); } /// Polls the channel to determine if there is guaranteed capacity to send @@ -663,24 +581,24 @@ impl Sender { } fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> { - // First check the `maybe_parked` variable. This avoids acquiring the + // First check the `maybe_blocked` variable. This avoids acquiring the // lock in most cases - if self.maybe_parked { + if self.maybe_blocked { // Get a lock on the task handle - let mut task = self.sender_task.lock().unwrap(); + let mut sender_waker = self.sender_waker.lock().unwrap(); - if !task.is_parked { - self.maybe_parked = false; + if !sender_waker.is_blocked { + self.maybe_blocked = false; return Async::Ready(()) } - // At this point, an unpark request is pending, so there will be an - // unpark sometime in the future. We just need to make sure that + // At this point, an wake request is pending, so there will be an + // wake sometime in the future. We just need to make sure that // the correct task will be notified. // - // Update the task in case the `Sender` has been moved to another + // Update the waker in case the `Sender` has been moved to another // task - task.task = cx.map(|cx| cx.waker().clone()); + sender_waker.waker = cx.map(|cx| cx.waker().clone()); Async::Pending } else { @@ -689,52 +607,6 @@ impl Sender { } } -impl UnboundedSender { - /// Check if the channel is ready to receive a message. - pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> { - self.0.poll_ready_nb() - } - - /// Returns whether this channel is closed without needing a context. - pub fn is_closed(&self) -> bool { - self.0.is_closed() - } - - /// Closes this channel from the sender side, preventing any new messages. - pub fn close_channel(&self) { - // There's no need to park this sender, its dropping, - // and we don't want to check for capacity, so skip - // that stuff from `do_send`. - - let _ = self.0.do_send_nb(None); - } - - /// Send a message on the channel. - /// - /// This method should only be called after `poll_ready` has been used to - /// verify that the channel is ready to receive a message. - pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.0.do_send_nb(Some(msg)) - .map_err(|e| e.err) - } - - /// Sends a message along this channel. - /// - /// This is an unbounded sender, so this function differs from `Sink::send` - /// by ensuring the return type reflects that the channel is always ready to - /// receive messages. - pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.0.do_send_nb(Some(msg)) - } -} - -impl Clone for UnboundedSender { - fn clone(&self) -> UnboundedSender { - UnboundedSender(self.0.clone()) - } -} - - impl Clone for Sender { fn clone(&self) -> Sender { // Since this atomic op isn't actually guarding any memory and we don't @@ -758,8 +630,8 @@ impl Clone for Sender { if actual == curr { return Sender { inner: self.inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, + sender_waker: Arc::new(Mutex::new(SenderWaker::new())), + maybe_blocked: false, }; } @@ -817,7 +689,7 @@ impl Receiver { loop { match unsafe { self.inner.parked_queue.pop() } { PopResult::Data(task) => { - task.lock().unwrap().notify(); + task.lock().unwrap().wake(); } PopResult::Empty => break, PopResult::Inconsistent => thread::yield_now(), @@ -825,7 +697,7 @@ impl Receiver { } } - /// Tries to receive the next message without notifying a context if empty. + /// Tries to receive the next message without wakeing a context if empty. /// /// It is not recommended to call this function from inside of a future, /// only when you've otherwise arranged to be notified when the channel is @@ -846,7 +718,7 @@ impl Receiver { PopResult::Data(msg) => { // If there are any parked task handles in the parked queue, pop // one and unpark it. - self.unpark_one(); + self.wake_one(); // Decrement number of messages self.dec_num_messages(); @@ -876,11 +748,11 @@ impl Receiver { } // Unpark a single task handle if there is one pending in the parked queue - fn unpark_one(&mut self) { + fn wake_one(&mut self) { loop { match unsafe { self.inner.parked_queue.pop() } { PopResult::Data(task) => { - task.lock().unwrap().notify(); + task.lock().unwrap().wake(); return; } PopResult::Empty => { @@ -895,29 +767,6 @@ impl Receiver { } } - // Try to park the receiver task - fn try_park(&self, cx: &mut task::Context) -> TryPark { - let curr = self.inner.state.load(SeqCst); - let state = decode_state(curr); - - // If the channel is closed, then there is no need to park. - if !state.is_open && state.num_messages == 0 { - return TryPark::Closed; - } - - // First, track the task in the `recv_task` slot - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - if recv_task.unparked { - // Consume the `unpark` signal without actually parking - recv_task.unparked = false; - return TryPark::NotEmpty; - } - - recv_task.task = Some(cx.waker().clone()); - TryPark::Parked - } - fn dec_num_messages(&self) { let mut curr = self.inner.state.load(SeqCst); @@ -933,6 +782,21 @@ impl Receiver { } } } + + fn poll_next_no_register(&mut self) -> Async> { + // Try to read a message off of the message queue. + if let Async::Ready(msg) = self.next_message() { + return Async::Ready(msg); + } + + // Check if the channel is closed. + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open && state.num_messages == 0 { + return Async::Ready(None); + } + + Async::Pending + } } impl Stream for Receiver { @@ -940,37 +804,19 @@ impl Stream for Receiver { type Error = Never; fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - loop { - // Try to read a message off of the message queue. - let msg = match self.next_message() { - Async::Ready(msg) => msg, - Async::Pending => { - // There are no messages to read, in this case, attempt to - // park. The act of parking will verify that the channel is - // still empty after the park operation has completed. - match self.try_park(cx) { - TryPark::Parked => { - // The task was parked, and the channel is still - // empty, return Pending. - return Ok(Async::Pending); - } - TryPark::Closed => { - // The channel is closed, there will be no further - // messages. - return Ok(Async::Ready(None)); - } - TryPark::NotEmpty => { - // A message has been sent while attempting to - // park. Loop again, the next iteration is - // guaranteed to get the message. - continue; - } - } - } - }; - // Return the message - return Ok(Async::Ready(msg)); + if let Async::Ready(x) = self.poll_next_no_register() { + return Ok(Async::Ready(x)); } + + // Register to receive a wakeup when more messages are sent. + self.inner.recv_waker.register(cx.waker()); + + // Check again for messages just in case one arrived in + // between the call to `next_message` and `register` above. + Ok(self.poll_next_no_register()) + + // The channel is not empty, not closed, and + // we're set to receive a wakeup when a message is sent. } } @@ -984,34 +830,6 @@ impl Drop for Receiver { } } -impl UnboundedReceiver { - /// Closes the receiving half of the channel, without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - self.0.close(); - } - - /// Tries to receive the next message without notifying a context if empty. - /// - /// It is not recommended to call this function from inside of a future, - /// only when you've otherwise arranged to be notified when the channel is - /// no longer empty. - pub fn try_next(&mut self) -> Result, TryRecvError> { - self.0.try_next() - } -} - -impl Stream for UnboundedReceiver { - type Item = T; - type Error = Never; - - fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - self.0.poll_next(cx) - } -} - /* * * ===== impl Inner ===== @@ -1054,3 +872,189 @@ fn encode_state(state: &State) -> usize { num } + +/* + * + * ==== Unbounded channels ==== + * + */ + +/// Creates an unbounded mpsc channel for communicating between asynchronous tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + let tx = Arc::new(UnboundedInner { + closed: AtomicBool::new(false), + message_queue: Queue::new(), + recv_waker: AtomicWaker::new(), + }); + let rx = tx.clone(); + (UnboundedSender(tx), UnboundedReceiver(rx)) +} + +/// The transmission end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug, Clone)] +pub struct UnboundedSender(Arc>); + +/// The receiving end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedReceiver(Arc>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender {} + +#[derive(Debug)] +struct UnboundedInner { + closed: AtomicBool, + message_queue: Queue, + recv_waker: AtomicWaker, +} + +impl UnboundedSender { + /// Check if the channel is ready to receive a message. + pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> { + Ok(Async::Ready(())) + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.closed.load(SeqCst) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&self) { + self.0.closed.store(true, SeqCst); + self.0.recv_waker.wake(); + } + + /// Send a message on the channel. + /// + /// This method should only be called after `poll_ready` has been used to + /// verify that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + if self.0.closed.load(SeqCst) { + return Err(SendError::disconnected()); + } + self.0.message_queue.push(msg); + self.0.recv_waker.wake(); + Ok(()) + } + + /// Sends a message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { + // TODO there's a race between checking the `closed` atomicbool + // and pushing onto the queue. + if self.0.closed.load(SeqCst) { + return Err(TrySendError { + err: SendError::disconnected(), + val: msg, + }); + } + self.0.message_queue.push(msg); + self.0.recv_waker.wake(); + Ok(()) + } +} + +impl Drop for UnboundedSender { + fn drop(&mut self) { + if Arc::strong_count(&self.0) == 2 { + // If it's just us and the reciever, or us and another sender, + // the channel should be closed. + self.0.closed.store(true, SeqCst); + self.0.recv_waker.wake(); + } + } +} + +impl UnboundedReceiver { + /// Closes the receiving half of the channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.0.closed.store(true, SeqCst); + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + pub fn try_next(&mut self) -> Result, TryRecvError> { + loop { + // Safe because this is the only place the message queue is popped, + // and it takes `&mut self` to ensure that only the unique reciever + // can pop off of the message queue. + match unsafe { self.0.message_queue.pop() } { + PopResult::Data(msg) => return Ok(Some(msg)), + PopResult::Empty => { + if self.0.closed.load(SeqCst) { + // Ensure that the `closed` state wasn't written after + // a final message was sent. + match unsafe { self.0.message_queue.pop() } { + PopResult::Data(msg) => return Ok(Some(msg)), + PopResult::Empty => return Ok(None), + PopResult::Inconsistent => { + thread::yield_now(); + continue; + } + } + } + return Err(TryRecvError { _inner: () }); + } + PopResult::Inconsistent => { + // Inconsistent means that there will be a message to pop + // in a short time. This branch can only be reached if + // values are being produced from another thread, so there + // are a few ways that we can deal with this: + // + // 1) Spin + // 2) thread::yield_now() + // 3) task::current().unwrap() & return Pending + // + // For now, thread::yield_now() is used, but it would + // probably be better to spin a few times then yield. + thread::yield_now(); + } + } + } + } +} + +impl Stream for UnboundedReceiver { + type Item = T; + type Error = Never; + + fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + if let Ok(msg) = self.try_next() { + return Ok(Async::Ready(msg)); + } + self.0.recv_waker.register(cx.waker()); + if let Ok(msg) = self.try_next() { + return Ok(Async::Ready(msg)); + } + Ok(Async::Pending) + } +} + +impl Drop for UnboundedReceiver { + fn drop(&mut self) { + self.0.closed.store(true, SeqCst); + } +} + diff --git a/futures-channel/src/mpsc/queue.rs b/futures-channel/src/mpsc/queue.rs index 0c4d7953e4..15af832b49 100644 --- a/futures-channel/src/mpsc/queue.rs +++ b/futures-channel/src/mpsc/queue.rs @@ -80,7 +80,7 @@ unsafe impl Send for Queue { } unsafe impl Sync for Queue { } impl Node { - unsafe fn new(v: Option) -> *mut Node { + fn new(v: Option) -> *mut Node { Box::into_raw(Box::new(Node { next: AtomicPtr::new(ptr::null_mut()), value: v, @@ -92,7 +92,7 @@ impl Queue { /// Creates a new queue that is safe to share among multiple producers and /// one consumer. pub fn new() -> Queue { - let stub = unsafe { Node::new(None) }; + let stub = Node::new(None); Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub), diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index 9d7395a070..a3fdba69d0 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -12,7 +12,7 @@ use futures::future::poll_fn; use futures::task; use futures_channel::mpsc; use futures_channel::oneshot; -use futures_executor::block_on; +use futures_executor::{block_on, block_on_stream}; trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} @@ -94,20 +94,15 @@ fn send_recv_threads() { #[test] fn send_recv_threads_no_capacity() { let (tx, rx) = mpsc::channel::(0); + let mut rx = block_on_stream(rx); - let (readytx, readyrx) = mpsc::channel::<()>(2); let t = thread::spawn(move|| { - let readytx = readytx.sink_map_err(|_| panic!()); - let (a, b) = block_on(tx.send(1).join(readytx.send(()))).unwrap(); - block_on(a.send(2).join(b.send(()))).unwrap(); + let tx = block_on(tx.send(1)).unwrap(); + block_on(tx.send(2)).unwrap(); }); - let readyrx = block_on(readyrx.next()).ok().unwrap().1; - let (item, rx) = block_on(rx.next()).ok().unwrap(); - assert_eq!(item, Some(1)); - drop(block_on(readyrx.next()).ok().unwrap()); - let item = block_on(rx.next()).ok().unwrap().0; - assert_eq!(item, Some(2)); + assert_eq!(rx.next(), Some(Ok(1))); + assert_eq!(rx.next(), Some(Ok(2))); t.join().unwrap(); } @@ -401,7 +396,8 @@ fn stress_close_receiver_iter() { Some(r) => assert!(i == r), None => { let unwritten = unwritten_rx.recv().expect("unwritten_rx"); - assert_eq!(unwritten, i); + assert!(unwritten <= i + 1); + assert!(unwritten >= i); th.join().unwrap(); return; } diff --git a/futures-sink/src/channel_impls.rs b/futures-sink/src/channel_impls.rs index f79c4b6bab..2aec273bb4 100644 --- a/futures-sink/src/channel_impls.rs +++ b/futures-sink/src/channel_impls.rs @@ -14,8 +14,14 @@ impl Sink for Sender { self.start_send(msg) } - fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { - Ok(Async::Ready(())) + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + match self.poll_ready(cx) { + Err(ref e) if e.is_disconnected() => { + // If the receiver disconnected, we consider the sink to be flushed. + Ok(Async::Ready(())) + } + x => x, + } } fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 17ec5277ba..05f8e8970c 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -413,8 +413,8 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - let (left_send, left_recv) = mpsc::channel(0); - let (right_send, right_recv) = mpsc::channel(0); + let (left_send, left_recv) = mpsc::channel(1); + let (right_send, right_recv) = mpsc::channel(1); let sink = left_send.fanout(right_send); let sink = block_on(StartSendFut::new(sink, 0)).unwrap();