diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 564021758176b..3e40cbb37e3d8 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError}; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; use sys_common::poison::{self, LockResult}; -use time::Duration; +use time::{Duration, Instant}; /// A type indicating whether a timed wait on a condition variable returned /// due to a time out or not. @@ -219,6 +219,64 @@ impl Condvar { } } + /// Blocks the current thread until this condition variable receives a + /// notification and the required condition is met. Spurious wakeups are + /// ignored and this function will only return once the condition has been + /// met. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// [`notify_one`]: #method.notify_one + /// [`notify_all`]: #method.notify_all + /// [poisoning]: ../sync/struct.Mutex.html#poisoning + /// [`Mutex`]: ../sync/struct.Mutex.html + /// + /// # Examples + /// + /// ``` + /// #![feature(wait_until)] + /// + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let &(ref lock, ref cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let &(ref lock, ref cvar) = &*pair; + /// // As long as the value inside the `Mutex` is false, we wait. + /// let _guard = cvar.wait_until(lock.lock().unwrap(), |started| { *started }).unwrap(); + /// ``` + #[unstable(feature = "wait_until", issue = "47960")] + pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, + mut condition: F) + -> LockResult> + where F: FnMut(&mut T) -> bool { + while !condition(&mut *guard) { + guard = self.wait(guard)?; + } + Ok(guard) + } + + /// Waits on this condition variable for a notification, timing out after a /// specified duration. /// @@ -293,7 +351,15 @@ impl Condvar { /// /// Note that the best effort is made to ensure that the time waited is /// measured with a monotonic clock, and not affected by the changes made to - /// the system time. + /// the system time. This function is susceptible to spurious wakeups. + /// Condition variables normally have a boolean predicate associated with + /// them, and the predicate must always be checked each time this function + /// returns to protect against spurious wakeups. Additionally, it is + /// typically desirable for the time-out to not exceed some duration in + /// spite of spurious wakes, thus the sleep-duration is decremented by the + /// amount slept. Alternatively, use the `wait_timeout_until` method + /// to wait until a condition is met with a total time-out regardless + /// of spurious wakes. /// /// The returned [`WaitTimeoutResult`] value indicates if the timeout is /// known to have elapsed. @@ -302,6 +368,7 @@ impl Condvar { /// returns, regardless of whether the timeout elapsed or not. /// /// [`wait`]: #method.wait + /// [`wait_timeout_until`]: #method.wait_timeout_until /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html /// /// # Examples @@ -353,6 +420,80 @@ impl Condvar { } } + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. Spurious wakes will not cause this function to + /// return. + /// + /// The semantics of this function are equivalent to [`wait_until`] except + /// that the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed without the condition being met. + /// + /// Like [`wait_until`], the lock specified will be re-acquired when this + /// function returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait_until`]: #method.wait_until + /// [`wait_timeout`]: #method.wait_timeout + /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html + /// + /// # Examples + /// + /// ``` + /// #![feature(wait_timeout_until)] + /// + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let &(ref lock, ref cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let &(ref lock, ref cvar) = &*pair; + /// let result = cvar.wait_timeout_until( + /// lock.lock().unwrap(), + /// Duration::from_millis(100), + /// |&mut started| started, + /// ).unwrap(); + /// if result.1.timed_out() { + /// // timed-out without the condition ever evaluating to true. + /// } + /// // access the locked mutex via result.0 + /// ``` + #[unstable(feature = "wait_timeout_until", issue = "47960")] + pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, + dur: Duration, mut condition: F) + -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> + where F: FnMut(&mut T) -> bool { + let start = Instant::now(); + loop { + if condition(&mut *guard) { + return Ok((guard, WaitTimeoutResult(false))); + } + let timeout = match dur.checked_sub(start.elapsed()) { + Some(timeout) => timeout, + None => return Ok((guard, WaitTimeoutResult(true))), + }; + guard = self.wait_timeout(guard, timeout)?.0; + } + } + /// Wakes up one blocked thread on this condvar. /// /// If there is a blocked thread on this condition variable, then it will @@ -478,6 +619,7 @@ impl Drop for Condvar { #[cfg(test)] mod tests { + /// #![feature(wait_until)] use sync::mpsc::channel; use sync::{Condvar, Mutex, Arc}; use sync::atomic::{AtomicBool, Ordering}; @@ -546,6 +688,29 @@ mod tests { } } + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_until() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move|| { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_until(lock.lock().unwrap(), |started| { + *started + }); + assert!(*guard.unwrap()); + } + #[test] #[cfg_attr(target_os = "emscripten", ignore)] fn wait_timeout_wait() { @@ -565,6 +730,53 @@ mod tests { } } + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), |_| { false }).unwrap(); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), |_| { true }).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let &(ref m, ref c) = &*pair; + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; + let mut started = lock.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + *started = true; + cvar.notify_one(); + }); + let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&mut notified| { + notified + }).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); + } + #[test] #[cfg_attr(target_os = "emscripten", ignore)] fn wait_timeout_wake() {