Skip to content

Commit

Permalink
Improve timer functionality (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull authored Aug 21, 2022
1 parent e361484 commit a4296c9
Showing 1 changed file with 117 additions and 44 deletions.
161 changes: 117 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ fn duration_max() -> Duration {
Duration::new(std::u64::MAX, 1_000_000_000 - 1)
}

fn instant_max() -> Instant {
// In order to ensure this point in time is never reached, it
// is put 30 years into the future.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}

/// A future or stream that emits timed events.
///
/// Timers are futures that output a single [`Instant`] when they fire.
Expand Down Expand Up @@ -139,13 +145,57 @@ pub struct Timer {
id_and_waker: Option<(usize, Waker)>,

/// The next instant at which this timer fires.
when: Instant,
///
/// If this timer is a blank timer, this value is None. If the timer
/// must be set, this value contains the next instant at which the
/// timer must fire.
when: Option<Instant>,

/// The period.
period: Duration,
}

impl Timer {
/// Creates a timer that will never fire.
///
/// # Examples
///
/// This function may also be useful for creating a function with an optional timeout.
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_io::Timer;
/// use futures_lite::prelude::*;
/// use std::time::Duration;
///
/// async fn run_with_timeout(timeout: Option<Duration>) {
/// let timer = timeout
/// .map(|timeout| Timer::after(timeout))
/// .unwrap_or_else(Timer::never);
///
/// run_lengthy_operation().or(timer).await;
/// }
/// # // Note that since a Timer as a Future returns an Instant,
/// # // this function needs to return an Instant to be used
/// # // in "or".
/// # async fn run_lengthy_operation() -> std::time::Instant {
/// # std::time::Instant::now()
/// # }
///
/// // Times out after 5 seconds.
/// run_with_timeout(Some(Duration::from_secs(5))).await;
/// // Does not time out.
/// run_with_timeout(None).await;
/// # });
/// ```
pub fn never() -> Timer {
Timer {
id_and_waker: None,
when: None,
period: duration_max(),
}
}

/// Creates a timer that emits an event once after the given duration of time.
///
/// # Examples
Expand All @@ -159,7 +209,11 @@ impl Timer {
/// # });
/// ```
pub fn after(duration: Duration) -> Timer {
Timer::at(Instant::now() + duration)
Timer::at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
)
}

/// Creates a timer that emits an event once at the given time instant.
Expand Down Expand Up @@ -196,7 +250,12 @@ impl Timer {
/// # });
/// ```
pub fn interval(period: Duration) -> Timer {
Timer::interval_at(Instant::now() + period, period)
Timer::interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
)
}

/// Creates a timer that emits events periodically, starting at `start`.
Expand All @@ -217,7 +276,7 @@ impl Timer {
pub fn interval_at(start: Instant, period: Duration) -> Timer {
Timer {
id_and_waker: None,
when: start,
when: Some(start),
period,
}
}
Expand All @@ -240,7 +299,11 @@ impl Timer {
/// # });
/// ```
pub fn set_after(&mut self, duration: Duration) {
self.set_at(Instant::now() + duration);
self.set_at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
);
}

/// Sets the timer to emit an event once at the given time instant.
Expand All @@ -264,17 +327,17 @@ impl Timer {
/// # });
/// ```
pub fn set_at(&mut self, instant: Instant) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

// Update the timeout.
self.when = instant;
self.when = Some(instant);

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(instant, waker);
}
}

Expand All @@ -299,7 +362,12 @@ impl Timer {
/// # });
/// ```
pub fn set_interval(&mut self, period: Duration) {
self.set_interval_at(Instant::now() + period, period);
self.set_interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
);
}

/// Sets the timer to emit events periodically, starting at `start`.
Expand All @@ -324,26 +392,26 @@ impl Timer {
/// # });
/// ```
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

self.when = start;
self.when = Some(start);
self.period = period;

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(start, waker);
}
}
}

impl Drop for Timer {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
Reactor::get().remove_timer(when, id);
}
}
}
Expand All @@ -363,39 +431,44 @@ impl Future for Timer {
impl Stream for Timer {
type Item = Instant;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
let when = self.when;
if let Some(next) = when.checked_add(self.period) {
self.when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
return Poll::Ready(Some(when));
} else {
match &self.id_and_waker {
None => {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(ref mut when) = this.when {
// Check if the timer has already fired.
if Instant::now() >= *when {
if let Some((id, _)) = this.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(*when, id);
}
let result_time = *when;
if let Some(next) = (*when).checked_add(this.period) {
*when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
let id = Reactor::get().insert_timer(next, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(self.when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
return Poll::Ready(Some(result_time));
} else {
match &this.id_and_waker {
None => {
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(*when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Some(_) => {}
}
}

Poll::Pending
}
}
Expand Down

0 comments on commit a4296c9

Please sign in to comment.