Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Condvar APIs not susceptible to spurious wake #47970

Merged
merged 9 commits into from
Feb 25, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 214 additions & 2 deletions src/libstd/sync/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use sys_common::poison::{self, LockResult};
use time::Duration;
use time::{Duration, Instant};

/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
Expand Down Expand Up @@ -219,6 +219,64 @@ impl Condvar {
}
}

/// Blocks the current thread until this condition variable receives a
/// notification and the required condition is met. Spurious wakeups are
/// ignored and this function will only return once the condition has been
/// met.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// # Errors
///
/// This function will return an error if the mutex being waited on is
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about [poisoning] on the [`Mutex`] type.
///
/// [`notify_one`]: #method.notify_one
/// [`notify_all`]: #method.notify_all
/// [poisoning]: ../sync/struct.Mutex.html#poisoning
/// [`Mutex`]: ../sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// #![feature(wait_until)]
///
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let &(ref lock, ref cvar) = &*pair;
/// // As long as the value inside the `Mutex` is false, we wait.
/// let _guard = cvar.wait_until(lock.lock().unwrap(), |started| { *started }).unwrap();
/// ```
#[unstable(feature = "wait_until", issue = "47960")]
pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut condition: F)
-> LockResult<MutexGuard<'a, T>>
where F: FnMut(&mut T) -> bool {
while !condition(&mut *guard) {
guard = self.wait(guard)?;
}
Ok(guard)
}


/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
Expand Down Expand Up @@ -293,7 +351,15 @@ impl Condvar {
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
/// the system time. This function is susceptible to spurious wakeups.
/// Condition variables normally have a boolean predicate associated with
/// them, and the predicate must always be checked each time this function
/// returns to protect against spurious wakeups. Additionally, it is
/// typically desirable for the time-out to not exceed some duration in
/// spite of spurious wakes, thus the sleep-duration is decremented by the
/// amount slept. Alternatively, use the `wait_timeout_until` method
/// to wait until a condition is met with a total time-out regardless
/// of spurious wakes.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed.
Expand All @@ -302,6 +368,7 @@ impl Condvar {
/// returns, regardless of whether the timeout elapsed or not.
///
/// [`wait`]: #method.wait
/// [`wait_timeout_until`]: #method.wait_timeout_until
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
Expand Down Expand Up @@ -353,6 +420,80 @@ impl Condvar {
}
}

/// Waits on this condition variable for a notification, timing out after a
/// specified duration. Spurious wakes will not cause this function to
/// return.
///
/// The semantics of this function are equivalent to [`wait_until`] except
/// that the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed without the condition being met.
///
/// Like [`wait_until`], the lock specified will be re-acquired when this
/// function returns, regardless of whether the timeout elapsed or not.
///
/// [`wait_until`]: #method.wait_until
/// [`wait_timeout`]: #method.wait_timeout
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
///
/// ```
/// #![feature(wait_timeout_until)]
///
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let result = cvar.wait_timeout_until(
/// lock.lock().unwrap(),
/// Duration::from_millis(100),
/// |&mut started| started,
/// ).unwrap();
/// if result.1.timed_out() {
/// // timed-out without the condition ever evaluating to true.
/// }
/// // access the locked mutex via result.0
/// ```
#[unstable(feature = "wait_timeout_until", issue = "47960")]
pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
dur: Duration, mut condition: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(&mut T) -> bool {
let start = Instant::now();
loop {
if condition(&mut *guard) {
return Ok((guard, WaitTimeoutResult(false)));
}
let timeout = match dur.checked_sub(start.elapsed()) {
Some(timeout) => timeout,
None => return Ok((guard, WaitTimeoutResult(true))),
};
guard = self.wait_timeout(guard, timeout)?.0;
}
}

/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
Expand Down Expand Up @@ -478,6 +619,7 @@ impl Drop for Condvar {

#[cfg(test)]
mod tests {
/// #![feature(wait_until)]
use sync::mpsc::channel;
use sync::{Condvar, Mutex, Arc};
use sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -546,6 +688,29 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_until() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});

// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let guard = cvar.wait_until(lock.lock().unwrap(), |started| {
*started
});
assert!(*guard.unwrap());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wait() {
Expand All @@ -565,6 +730,53 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wait() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), |_| { false }).unwrap();
// no spurious wakeups. ensure it timed-out
assert!(wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_instant_satisfy() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), |_| { true }).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wake() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_copy = pair.clone();

let &(ref m, ref c) = &*pair;
let g = m.lock().unwrap();
let _t = thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair_copy;
let mut started = lock.lock().unwrap();
thread::sleep(Duration::from_millis(1));
*started = true;
cvar.notify_one();
});
let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&mut notified| {
notified
}).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
assert!(*g2);
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wake() {
Expand Down