diff --git a/src/lib.rs b/src/lib.rs index de6d526..3a3b802 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,12 @@ fn duration_max() -> Duration { Duration::new(std::u64::MAX, 1_000_000_000 - 1) } +fn instant_max() -> Instant { + // In order to ensure this point in time is never reached, it + // is put 30 years into the future. + Instant::now() + Duration::from_secs(86400 * 365 * 30) +} + /// A future or stream that emits timed events. /// /// Timers are futures that output a single [`Instant`] when they fire. @@ -139,13 +145,57 @@ pub struct Timer { id_and_waker: Option<(usize, Waker)>, /// The next instant at which this timer fires. - when: Instant, + /// + /// If this timer is a blank timer, this value is None. If the timer + /// must be set, this value contains the next instant at which the + /// timer must fire. + when: Option, /// The period. period: Duration, } impl Timer { + /// Creates a timer that will never fire. + /// + /// # Examples + /// + /// This function may also be useful for creating a function with an optional timeout. + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_io::Timer; + /// use futures_lite::prelude::*; + /// use std::time::Duration; + /// + /// async fn run_with_timeout(timeout: Option) { + /// let timer = timeout + /// .map(|timeout| Timer::after(timeout)) + /// .unwrap_or_else(Timer::never); + /// + /// run_lengthy_operation().or(timer).await; + /// } + /// # // Note that since a Timer as a Future returns an Instant, + /// # // this function needs to return an Instant to be used + /// # // in "or". + /// # async fn run_lengthy_operation() -> std::time::Instant { + /// # std::time::Instant::now() + /// # } + /// + /// // Times out after 5 seconds. + /// run_with_timeout(Some(Duration::from_secs(5))).await; + /// // Does not time out. + /// run_with_timeout(None).await; + /// # }); + /// ``` + pub fn never() -> Timer { + Timer { + id_and_waker: None, + when: None, + period: duration_max(), + } + } + /// Creates a timer that emits an event once after the given duration of time. /// /// # Examples @@ -159,7 +209,11 @@ impl Timer { /// # }); /// ``` pub fn after(duration: Duration) -> Timer { - Timer::at(Instant::now() + duration) + Timer::at( + Instant::now() + .checked_add(duration) + .unwrap_or_else(instant_max), + ) } /// Creates a timer that emits an event once at the given time instant. @@ -196,7 +250,12 @@ impl Timer { /// # }); /// ``` pub fn interval(period: Duration) -> Timer { - Timer::interval_at(Instant::now() + period, period) + Timer::interval_at( + Instant::now() + .checked_add(period) + .unwrap_or_else(instant_max), + period, + ) } /// Creates a timer that emits events periodically, starting at `start`. @@ -217,7 +276,7 @@ impl Timer { pub fn interval_at(start: Instant, period: Duration) -> Timer { Timer { id_and_waker: None, - when: start, + when: Some(start), period, } } @@ -240,7 +299,11 @@ impl Timer { /// # }); /// ``` pub fn set_after(&mut self, duration: Duration) { - self.set_at(Instant::now() + duration); + self.set_at( + Instant::now() + .checked_add(duration) + .unwrap_or_else(instant_max), + ); } /// Sets the timer to emit an event once at the given time instant. @@ -264,17 +327,17 @@ impl Timer { /// # }); /// ``` pub fn set_at(&mut self, instant: Instant) { - if let Some((id, _)) = self.id_and_waker.as_ref() { + if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) { // Deregister the timer from the reactor. - Reactor::get().remove_timer(self.when, *id); + Reactor::get().remove_timer(when, *id); } // Update the timeout. - self.when = instant; + self.when = Some(instant); if let Some((id, waker)) = self.id_and_waker.as_mut() { // Re-register the timer with the new timeout. - *id = Reactor::get().insert_timer(self.when, waker); + *id = Reactor::get().insert_timer(instant, waker); } } @@ -299,7 +362,12 @@ impl Timer { /// # }); /// ``` pub fn set_interval(&mut self, period: Duration) { - self.set_interval_at(Instant::now() + period, period); + self.set_interval_at( + Instant::now() + .checked_add(period) + .unwrap_or_else(instant_max), + period, + ); } /// Sets the timer to emit events periodically, starting at `start`. @@ -324,26 +392,26 @@ impl Timer { /// # }); /// ``` pub fn set_interval_at(&mut self, start: Instant, period: Duration) { - if let Some((id, _)) = self.id_and_waker.as_ref() { + if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) { // Deregister the timer from the reactor. - Reactor::get().remove_timer(self.when, *id); + Reactor::get().remove_timer(when, *id); } - self.when = start; + self.when = Some(start); self.period = period; if let Some((id, waker)) = self.id_and_waker.as_mut() { // Re-register the timer with the new timeout. - *id = Reactor::get().insert_timer(self.when, waker); + *id = Reactor::get().insert_timer(start, waker); } } } impl Drop for Timer { fn drop(&mut self) { - if let Some((id, _)) = self.id_and_waker.take() { + if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) { // Deregister the timer from the reactor. - Reactor::get().remove_timer(self.when, id); + Reactor::get().remove_timer(when, id); } } } @@ -363,39 +431,44 @@ impl Future for Timer { impl Stream for Timer { type Item = Instant; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Check if the timer has already fired. - if Instant::now() >= self.when { - if let Some((id, _)) = self.id_and_waker.take() { - // Deregister the timer from the reactor. - Reactor::get().remove_timer(self.when, id); - } - let when = self.when; - if let Some(next) = when.checked_add(self.period) { - self.when = next; - // Register the timer in the reactor. - let id = Reactor::get().insert_timer(self.when, cx.waker()); - self.id_and_waker = Some((id, cx.waker().clone())); - } - return Poll::Ready(Some(when)); - } else { - match &self.id_and_waker { - None => { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + if let Some(ref mut when) = this.when { + // Check if the timer has already fired. + if Instant::now() >= *when { + if let Some((id, _)) = this.id_and_waker.take() { + // Deregister the timer from the reactor. + Reactor::get().remove_timer(*when, id); + } + let result_time = *when; + if let Some(next) = (*when).checked_add(this.period) { + *when = next; // Register the timer in the reactor. - let id = Reactor::get().insert_timer(self.when, cx.waker()); - self.id_and_waker = Some((id, cx.waker().clone())); + let id = Reactor::get().insert_timer(next, cx.waker()); + this.id_and_waker = Some((id, cx.waker().clone())); } - Some((id, w)) if !w.will_wake(cx.waker()) => { - // Deregister the timer from the reactor to remove the old waker. - Reactor::get().remove_timer(self.when, *id); - - // Register the timer in the reactor with the new waker. - let id = Reactor::get().insert_timer(self.when, cx.waker()); - self.id_and_waker = Some((id, cx.waker().clone())); + return Poll::Ready(Some(result_time)); + } else { + match &this.id_and_waker { + None => { + // Register the timer in the reactor. + let id = Reactor::get().insert_timer(*when, cx.waker()); + this.id_and_waker = Some((id, cx.waker().clone())); + } + Some((id, w)) if !w.will_wake(cx.waker()) => { + // Deregister the timer from the reactor to remove the old waker. + Reactor::get().remove_timer(*when, *id); + + // Register the timer in the reactor with the new waker. + let id = Reactor::get().insert_timer(*when, cx.waker()); + this.id_and_waker = Some((id, cx.waker().clone())); + } + Some(_) => {} } - Some(_) => {} } } + Poll::Pending } }