Skip to content

Commit

Permalink
macros: introduce biased argument for select! (#3603)
Browse files Browse the repository at this point in the history
  • Loading branch information
lilymara-onesignal authored Mar 12, 2021
1 parent a1b4bde commit b75d02a
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 26 deletions.
119 changes: 93 additions & 26 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,24 @@
///
/// ### Fairness
///
/// `select!` randomly picks a branch to check first. This provides some level
/// of fairness when calling `select!` in a loop with branches that are always
/// By default, `select!` randomly picks a branch to check first. This provides
/// some level of fairness when calling `select!` in a loop with branches that
/// are always ready.
///
/// This behavior can be overridden by adding `biased;` to the beginning of the
/// macro usage. See the exmples for details. This will cause `select` to poll
/// the futures in the order they appear from top to bottom. There are a few
/// reasons you may want this:
///
/// - The random number generation of `tokio::select!` has a non-zero CPU cost
/// - Your futures may interact in a way where known polling order is significant
///
/// But there is an important caveat to this mode. It becomes your responsibility
/// to ensure that the polling order of your futures is fair. If for example you
/// are selecting between a stream and a shutdown future, and the stream has a
/// huge volume of messages and zero or nearly zero time between them, you should
/// place the shutdown future earlier in the `select!` list to ensure that it is
/// always polled, and will not be ignored due to the stream being constantly
/// ready.
///
/// # Panics
Expand Down Expand Up @@ -283,6 +299,45 @@
/// assert_eq!(res.1, "second");
/// }
/// ```
///
/// Using the `biased;` mode to control polling order.
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// let mut count = 0u8;
///
/// loop {
/// tokio::select! {
/// // If you run this example without `biased;`, the polling order is
/// // psuedo-random, and the assertions on the value of count will
/// // (probably) fail.
/// biased;
///
/// _ = async {}, if count < 1 => {
/// count += 1;
/// assert_eq!(count, 1);
/// }
/// _ = async {}, if count < 2 => {
/// count += 1;
/// assert_eq!(count, 2);
/// }
/// _ = async {}, if count < 3 => {
/// count += 1;
/// assert_eq!(count, 3);
/// }
/// _ = async {}, if count < 4 => {
/// count += 1;
/// assert_eq!(count, 4);
/// }
///
/// else => {
/// break;
/// }
/// };
/// }
/// }
/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
macro_rules! select {
Expand All @@ -300,6 +355,10 @@ macro_rules! select {

// All input is normalized, now transform.
(@ {
// The index of the future to poll first (in bias mode), or the RNG
// expression to use to pick a future to poll first.
start=$start:expr;

// One `_` for each branch in the `select!` macro. Passing this to
// `count!` converts $skip to an integer.
( $($count:tt)* )
Expand Down Expand Up @@ -357,9 +416,11 @@ macro_rules! select {
// disabled.
let mut is_pending = false;

// Randomly generate a starting point. This makes `select!` a
// bit more fair and avoids always polling the first future.
let start = $crate::macros::support::thread_rng_n(BRANCHES);
// Choose a starting index to begin polling the futures at. In
// practice, this will either be a psuedo-randomly generrated
// number by default, or the constant 0 if `biased;` is
// supplied.
let start = $start;

for i in 0..BRANCHES {
let branch;
Expand Down Expand Up @@ -444,42 +505,48 @@ macro_rules! select {
// These rules match a single `select!` branch and normalize it for
// processing by the first rule.

(@ { $($t:tt)* } ) => {
(@ { start=$start:expr; $($t:tt)* } ) => {
// No `else` branch
$crate::select!(@{ $($t)*; panic!("all branches are disabled and there is no else branch") })
$crate::select!(@{ start=$start; $($t)*; panic!("all branches are disabled and there is no else branch") })
};
(@ { $($t:tt)* } else => $else:expr $(,)?) => {
$crate::select!(@{ $($t)*; $else })
(@ { start=$start:expr; $($t:tt)* } else => $else:expr $(,)?) => {
$crate::select!(@{ start=$start; $($t)*; $else })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, })
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, })
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};

// ===== Entry point =====

(biased; $p:pat = $($t:tt)* ) => {
$crate::select!(@{ start=0; () } $p = $($t)*)
};

( $p:pat = $($t:tt)* ) => {
$crate::select!(@{ () } $p = $($t)*)
// Randomly generate a starting point. This makes `select!` a bit more
// fair and avoids always polling the first future.
$crate::select!(@{ start={ $crate::macros::support::thread_rng_n(BRANCHES) }; () } $p = $($t)*)
};
() => {
compile_error!("select! requires at least one branch.")
Expand Down
59 changes: 59 additions & 0 deletions tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,62 @@ async fn mut_on_left_hand_side() {
.await;
assert_eq!(v, 2);
}

#[tokio::test]
async fn biased_one_not_ready() {
let (_tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
let (tx3, rx3) = oneshot::channel::<i32>();

tx2.send(2).unwrap();
tx3.send(3).unwrap();

let v = tokio::select! {
biased;

_ = rx1 => unreachable!(),
res = rx2 => {
assert_ok!(res)
},
_ = rx3 => {
panic!("This branch should never be activated because `rx2` should be polled before `rx3` due to `biased;`.")
}
};

assert_eq!(2, v);
}

#[tokio::test]
async fn biased_eventually_ready() {
use tokio::task::yield_now;

let one = async {};
let two = async { yield_now().await };
let three = async { yield_now().await };

let mut count = 0u8;

tokio::pin!(one, two, three);

loop {
tokio::select! {
biased;

_ = &mut two, if count < 2 => {
count += 1;
assert_eq!(count, 2);
}
_ = &mut three, if count < 3 => {
count += 1;
assert_eq!(count, 3);
}
_ = &mut one, if count < 1 => {
count += 1;
assert_eq!(count, 1);
}
else => break,
}
}

assert_eq!(count, 3);
}

0 comments on commit b75d02a

Please sign in to comment.