diff --git a/Cargo.toml b/Cargo.toml index cb34fbb..caf1f32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,9 @@ description = "Backoff policies" [dependencies] rand = "0.8.5" +futures = "0.3.21" +tokio = { version = "1.17.0", features = ["time"] } +pin-project = "1.0.10" [dev-dependencies] tokio = { version = "1.17.0", features = ["full"] } diff --git a/README.md b/README.md index 9247b11..12faf29 100644 --- a/README.md +++ b/README.md @@ -4,29 +4,23 @@ The opposite backoff implementation of the popular [backoff](https://docs.rs/bac - Newer: developed by Rust edition 2021 and latest stable. - Cleaner: Iterator based abstraction, easy to use, customization friendly. -- Smaller: Focused on backoff implementation, no need to play with runtime specific features. +- Easier: Trait based implementations, works like a native function provided by closures. ## Quick Start ```rust +use backon::Retryable; use backon::ExponentialBackoff; use anyhow::Result; +async fn fetch() -> Result { + Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +} + #[tokio::main] async fn main() -> Result<()> { - for delay in ExponentialBackoff::default() { - let x = reqwest::get("https://www.rust-lang.org").await?.text().await; - match x { - Ok(v) => { - println!("Successfully fetched"); - break; - }, - Err(_) => { - tokio::time::sleep(delay).await; - continue - } - }; - } + let content = fetch.retry(ExponentialBackoff::default()).await?; + println!("fetch succeeded: {}", contet); Ok(()) } diff --git a/src/backoff.rs b/src/backoff.rs new file mode 100644 index 0000000..353f453 --- /dev/null +++ b/src/backoff.rs @@ -0,0 +1,8 @@ +use std::time::Duration; + +/// Backoff is an [`Iterator`] that returns [`Duration`]. +/// +/// - `Some(Duration)` means caller need to `sleep(Duration)` and retry the same request +/// - `None` means we have reaching the limits, caller needs to return current error instead. +pub trait Backoff: Iterator + Clone {} +impl Backoff for T where T: Iterator + Clone {} diff --git a/src/constant.rs b/src/constant.rs index e92cf0a..9016888 100644 --- a/src/constant.rs +++ b/src/constant.rs @@ -1,5 +1,31 @@ use std::time::Duration; +/// ConstantBackoff provides backoff with constant delay and limited times. +/// +/// # Default +/// +/// - delay: 1s +/// - max times: 3 +/// +/// # Examples +/// +/// ```no_run +/// use backon::Retryable; +/// use backon::ConstantBackoff; +/// use anyhow::Result; +/// +/// async fn fetch() -> Result { +/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +/// } +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let content = fetch.retry(ConstantBackoff::default()).await?; +/// println!("fetch succeeded: {}", content); +/// +/// Ok(()) +/// } +/// ``` pub struct ConstantBackoff { delay: Duration, max_times: Option, @@ -18,11 +44,13 @@ impl Default for ConstantBackoff { } impl ConstantBackoff { + /// Set delay of current backoff. pub fn with_delay(mut self, delay: Duration) -> Self { self.delay = delay; self } + /// Set max times of current backoff. pub fn with_max_times(mut self, max_times: usize) -> Self { self.max_times = Some(max_times); self diff --git a/src/exponential.rs b/src/exponential.rs index 2ede9b7..7dd47a7 100644 --- a/src/exponential.rs +++ b/src/exponential.rs @@ -3,32 +3,34 @@ use std::time::Duration; /// Exponential backoff implementation. /// +/// # Default +/// +/// - jitter: false +/// - factor: 2 +/// - min_delay: 1s +/// - max_delay: 60s +/// - max_times: 3 +/// /// # Examples /// -/// ``` +/// ```no_run +/// use backon::Retryable; /// use backon::ExponentialBackoff; /// use anyhow::Result; /// +/// async fn fetch() -> Result { +/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +/// } +/// /// #[tokio::main] /// async fn main() -> Result<()> { -/// for delay in ExponentialBackoff::default() { -/// let x = reqwest::get("https://www.rust-lang.org").await?.text().await; -/// match x { -/// Ok(v) => { -/// println!("Successfully fetched"); -/// break; -/// }, -/// Err(_) => { -/// tokio::time::sleep(delay).await; -/// continue -/// } -/// }; -/// } +/// let content = fetch.retry(ExponentialBackoff::default()).await?; +/// println!("fetch succeeded: {}", content); /// /// Ok(()) /// } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ExponentialBackoff { jitter: bool, factor: f32, @@ -56,11 +58,20 @@ impl Default for ExponentialBackoff { } impl ExponentialBackoff { + /// Set jitter of current backoff. + /// + /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay) + /// to current delay. pub fn with_jitter(mut self) -> Self { self.jitter = true; self } + /// Set factor of current backoff. + /// + /// # Panics + /// + /// This function will panic if input factor smaller than `1.0`. pub fn with_factor(mut self, factor: f32) -> Self { debug_assert!(factor > 1.0, "invalid factor that lower than 1"); @@ -68,16 +79,23 @@ impl ExponentialBackoff { self } + /// Set min_delay of current backoff. pub fn with_min_delay(mut self, min_delay: Duration) -> Self { self.min_delay = min_delay; self } + /// Set max_delay of current backoff. + /// + /// Delay will not increasing if current delay is larger than max_delay. pub fn with_max_delay(mut self, max_delay: Duration) -> Self { self.max_delay = Some(max_delay); self } + /// Set max_times of current backoff. + /// + /// Backoff will return `None` if max times is reaching. pub fn with_max_times(mut self, max_times: usize) -> Self { self.max_times = Some(max_times); self diff --git a/src/lib.rs b/src/lib.rs index 04de5ee..07b8103 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,46 @@ +//! backon intends to provide an opposite backoff implementation of the popular [backoff](https://docs.rs/backoff). +//! +//! - Newer: developed by Rust edition 2021 and latest stable. +//! - Cleaner: Iterator based abstraction, easy to use, customization friendly. +//! - Easier: Trait based implementations, works like a native function provided by closures. +//! +//! # Backoff +//! +//! Any types that implements `Iterator` can be used as backoff. +//! +//! backon also provides backoff implementations with reasonable defaults: +//! +//! - [`ConstantBackoff`]: backoff with constant delay and limited times. +//! - [`ExponentialBackoff`]: backoff with exponential delay, also provides jitter supports. +//! +//! # Examples +//! +//! ```no_run +//! use backon::Retryable; +//! use backon::ExponentialBackoff; +//! use anyhow::Result; +//! +//! async fn fetch() -> Result { +//! Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<()> { +//! let content = fetch.retry(ExponentialBackoff::default()).await?; +//! println!("fetch succeeded: {}", content); +//! +//! Ok(()) +//! } +//! ``` + +mod backoff; +pub use backoff::Backoff; + mod constant; pub use constant::ConstantBackoff; + mod exponential; pub use exponential::ExponentialBackoff; -mod policy; -pub use policy::Policy; + +mod retry; +pub use retry::Retryable; diff --git a/src/policy.rs b/src/policy.rs deleted file mode 100644 index 856354c..0000000 --- a/src/policy.rs +++ /dev/null @@ -1,4 +0,0 @@ -use std::time::Duration; - -pub trait Policy: Iterator {} -impl Policy for T where T: Iterator {} diff --git a/src/retry.rs b/src/retry.rs new file mode 100644 index 0000000..802cb53 --- /dev/null +++ b/src/retry.rs @@ -0,0 +1,179 @@ +use futures::ready; +use pin_project::pin_project; + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::Backoff; + +/// Retryable will add retry support for functions that produces a futures with results. +/// +/// That means all types that implement `FnMut() -> impl Future>` +/// will be able to use `retry`. +/// +/// For example: +/// +/// - Functions without extra args: +/// +/// ```ignore +/// async fn fetch() -> Result { +/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +/// } +/// ``` +/// +/// - Closures +/// +/// ```ignore +/// || async { +/// let x = reqwest::get("https://www.rust-lang.org") +/// .await? +/// .text() +/// .await?; +/// +/// Err(anyhow::anyhow!(x)) +/// } +/// ``` +/// +/// # Example +/// +/// ```no_run +/// use backon::Retryable; +/// use backon::ExponentialBackoff; +/// use anyhow::Result; +/// +/// async fn fetch() -> Result { +/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?) +/// } +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let content = fetch.retry(ExponentialBackoff::default()).await?; +/// println!("fetch succeeded: {}", content); +/// +/// Ok(()) +/// } +/// ``` +pub trait Retryable< + P: Backoff, + T, + E, + Fut: Future>, + FutureFn: FnMut() -> Fut, +> +{ + fn retry(self, policy: P) -> Retry; +} + +impl Retryable for FutureFn +where + P: Backoff, + Fut: Future>, + FutureFn: FnMut() -> Fut, +{ + fn retry(self, policy: P) -> Retry { + Retry { + backoff: policy, + error_fn: |_: &E| true, + future_fn: self, + state: State::Idle, + } + } +} + +#[pin_project] +pub struct Retry< + P: Backoff, + T, + E, + Fut: Future>, + FutureFn: FnMut() -> Fut, +> { + backoff: P, + error_fn: fn(&E) -> bool, + future_fn: FutureFn, + + #[pin] + state: State, +} + +#[pin_project(project = StateProject)] +enum State>> { + Idle, + + Polling(#[pin] Fut), + // TODO: we need to support other sleeper + Sleeping(#[pin] Pin>), +} + +impl Default for State +where + Fut: Future>, +{ + fn default() -> Self { + State::Idle + } +} + +impl Future for Retry +where + P: Backoff, + Fut: Future>, + FutureFn: FnMut() -> Fut, +{ + type Output = std::result::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let state = this.state.as_mut().project(); + match state { + StateProject::Idle => { + let fut = (this.future_fn)(); + // this.state = State::Polling(fut); + this.state.set(State::Polling(fut)); + continue; + } + StateProject::Polling(fut) => match ready!(fut.poll(cx)) { + Ok(v) => return Poll::Ready(Ok(v)), + Err(err) => match this.backoff.next() { + None => return Poll::Ready(Err(err)), + Some(dur) => { + this.state + .set(State::Sleeping(Box::pin(tokio::time::sleep(dur)))); + continue; + } + }, + }, + StateProject::Sleeping(sl) => { + ready!(sl.poll(cx)); + this.state.set(State::Idle); + continue; + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::exponential::ExponentialBackoff; + + async fn always_error() -> anyhow::Result<()> { + Err(anyhow::anyhow!("test_query meets error")) + } + + #[tokio::test] + async fn test_retry_x() -> anyhow::Result<()> { + let result = always_error + .retry(ExponentialBackoff::default().with_min_delay(Duration::from_millis(1))) + .await; + + assert!(result.is_err()); + assert_eq!("test_query meets error", result.unwrap_err().to_string()); + Ok(()) + } +}