Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the unstable and deprecated mpsc_select #60921

Merged
merged 1 commit into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions src/libstd/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,61 +357,6 @@ macro_rules! dbg {
};
}

/// Selects the first successful receive event from a number of receivers.
///
/// This macro is used to wait for the first event to occur on a number of
/// receivers. It places no restrictions on the types of receivers given to
/// this macro, this can be viewed as a heterogeneous select.
///
/// # Examples
///
/// ```
/// #![feature(mpsc_select)]
///
/// use std::thread;
/// use std::sync::mpsc;
///
/// // two placeholder functions for now
/// fn long_running_thread() {}
/// fn calculate_the_answer() -> u32 { 42 }
///
/// let (tx1, rx1) = mpsc::channel();
/// let (tx2, rx2) = mpsc::channel();
///
/// thread::spawn(move|| { long_running_thread(); tx1.send(()).unwrap(); });
/// thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); });
///
/// select! {
/// _ = rx1.recv() => println!("the long running thread finished first"),
/// answer = rx2.recv() => {
/// println!("the answer was: {}", answer.unwrap());
/// }
/// }
/// # drop(rx1.recv());
/// # drop(rx2.recv());
/// ```
///
/// For more information about select, see the `std::sync::mpsc::Select` structure.
#[macro_export]
#[unstable(feature = "mpsc_select", issue = "27800")]
#[rustc_deprecated(since = "1.32.0",
reason = "channel selection will be removed in a future release")]
macro_rules! select {
(
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
) => ({
use $crate::sync::mpsc::Select;
let sel = Select::new();
$( let mut $rx = sel.handle(&$rx); )+
unsafe {
$( $rx.add(); )+
}
let ret = sel.wait();
$( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
{ unreachable!() }
})
}

#[cfg(test)]
macro_rules! assert_approx_eq {
($a:expr, $b:expr) => ({
Expand Down
85 changes: 2 additions & 83 deletions src/libstd/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
//! ```

#![stable(feature = "rust1", since = "1.0.0")]
#![allow(deprecated)] // for mpsc_select

// A description of how Rust's channel implementation works
//
Expand Down Expand Up @@ -263,6 +262,8 @@
// believe that there is anything fundamental that needs to change about these
// channels, however, in order to support a more efficient select().
//
// FIXME: Select is now removed, so these factors are ready to be cleaned up!
//
// # Conclusion
//
// And now that you've seen all the races that I found and attempted to fix,
Expand All @@ -275,18 +276,8 @@ use crate::mem;
use crate::cell::UnsafeCell;
use crate::time::{Duration, Instant};

#[unstable(feature = "mpsc_select", issue = "27800")]
pub use self::select::{Select, Handle};
use self::select::StartResult;
use self::select::StartResult::*;
use self::blocking::SignalToken;

#[cfg(all(test, not(target_os = "emscripten")))]
mod select_tests;

mod blocking;
mod oneshot;
mod select;
mod shared;
mod stream;
mod sync;
Expand Down Expand Up @@ -1514,78 +1505,6 @@ impl<T> Receiver<T> {

}

impl<T> select::Packet for Receiver<T> {
fn can_recv(&self) -> bool {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Stream(ref p) => {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Shared(ref p) => return p.can_recv(),
Flavor::Sync(ref p) => return p.can_recv(),
};
unsafe {
mem::swap(self.inner_mut(),
new_port.inner_mut());
}
}
}

fn start_selection(&self, mut token: SignalToken) -> StartResult {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match p.start_selection(token) {
oneshot::SelSuccess => return Installed,
oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Stream(ref p) => {
match p.start_selection(token) {
stream::SelSuccess => return Installed,
stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Shared(ref p) => return p.start_selection(token),
Flavor::Sync(ref p) => return p.start_selection(token),
};
token = t;
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}

fn abort_selection(&self) -> bool {
let mut was_upgrade = false;
loop {
let result = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.abort_selection(),
Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
Flavor::Sync(ref p) => return p.abort_selection(),
};
let new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
unsafe {
mem::swap(self.inner_mut(),
new_port.inner_mut());
}
}
}
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
Expand Down
72 changes: 0 additions & 72 deletions src/libstd/sync/mpsc/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

pub use self::Failure::*;
pub use self::UpgradeResult::*;
pub use self::SelectionResult::*;
use self::MyUpgrade::*;

use crate::sync::mpsc::Receiver;
Expand Down Expand Up @@ -66,12 +65,6 @@ pub enum UpgradeResult {
UpWoke(SignalToken),
}

pub enum SelectionResult<T> {
SelCanceled,
SelUpgraded(SignalToken, Receiver<T>),
SelSuccess,
}

enum MyUpgrade<T> {
NothingSent,
SendUsed,
Expand Down Expand Up @@ -264,71 +257,6 @@ impl<T> Packet<T> {
// select implementation
////////////////////////////////////////////////////////////////////////////

// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),

// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { ptr::write(self.upgrade.get(), up); Ok(true) }
}
}
_ => unreachable!(), // we're the "one blocker"
}
}
}

// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
unsafe {
let ptr = token.cast_to_usize();
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED if (*self.data.get()).is_some() => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
}

// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
ptr::write(self.upgrade.get(), up);
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
}
}
_ => unreachable!(), // we're the "one blocker"
}
}
}

// Remove a previous selecting thread from this port. This ensures that the
// blocked thread will no longer be visible to any other threads.
//
Expand Down
Loading