diff --git a/.github/actions/check/action.yml b/.github/actions/check/action.yml deleted file mode 100644 index 9e754e6..0000000 --- a/.github/actions/check/action.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: 'Check' -description: 'Check will do all essential checks' -inputs: - github_token: - description: "Github Token" - required: true -runs: - using: "composite" - steps: - - name: Format - uses: actions-rs/cargo@v1 - with: - command: fmt - args: --all -- --check - - - name: Clippy - uses: actions-rs/cargo@v1 - with: - command: clippy - args: --all-targets -- -D warnings diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 826d3ae..8c62540 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,12 @@ name: CI -on: [push, pull_request] +on: + push: + branches: + - main + pull_request: + branches: + - main concurrency: group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} @@ -11,25 +17,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: ./.github/actions/check - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - - build: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - - windows-latest - steps: - - uses: actions/checkout@v4 - - uses: Swatinem/rust-cache@v2 - - name: Build - uses: actions-rs/cargo@v1 - with: - command: build + - name: Format + run: cargo fmt --all -- --check + - name: Clippy + run: cargo clippy --all-features --all-targets -- -D warnings unit: runs-on: ${{ matrix.os }} @@ -41,23 +32,19 @@ jobs: - windows-latest steps: - uses: actions/checkout@v4 - - uses: Swatinem/rust-cache@v2 - name: Test - uses: actions-rs/cargo@v1 - with: - command: test - args: -- --nocapture + run: cargo test --all-features env: RUST_LOG: DEBUG RUST_BACKTRACE: full - + wasm-unit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Install run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - - run: wasm-pack test --node + - run: wasm-pack test --node --all-features env: RUST_LOG: DEBUG RUST_BACKTRACE: full diff --git a/Cargo.toml b/Cargo.toml index cc4d106..0aa6f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,18 @@ name = "backon" repository = "https://github.com/Xuanwo/backon" version = "0.4.4" +[features] +gloo-timers-sleep = ["dep:gloo-timers", "gloo-timers/futures"] +tokio-sleep = ["dep:tokio", "tokio/time"] + [dependencies] fastrand = "2.0.0" -tokio = { version = "1", features = ["time"] } + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -gloo-timers = { version = "0.3", features = ["futures"] } +gloo-timers = { version = "0.3", optional = true } [dev-dependencies] anyhow = "1" diff --git a/src/blocking_retry_with_context.rs b/src/blocking_retry_with_context.rs index 40e98ae..54eaa4d 100644 --- a/src/blocking_retry_with_context.rs +++ b/src/blocking_retry_with_context.rs @@ -14,7 +14,7 @@ pub trait BlockingRetryableWithContext< > { /// Generate a new retry - fn retry(self, builder: &B) -> BlockingRetry; + fn retry(self, builder: &B) -> BlockingRetryWithContext; } impl BlockingRetryableWithContext for F @@ -22,13 +22,13 @@ where B: BackoffBuilder, F: FnMut(Ctx) -> (Ctx, Result), { - fn retry(self, builder: &B) -> BlockingRetry { - BlockingRetry::new(self, builder.build()) + fn retry(self, builder: &B) -> BlockingRetryWithContext { + BlockingRetryWithContext::new(self, builder.build()) } } /// Retry struct generated by [`Retryable`]. -pub struct BlockingRetry< +pub struct BlockingRetryWithContext< B: Backoff, T, E, @@ -44,14 +44,14 @@ pub struct BlockingRetry< ctx: Option, } -impl BlockingRetry +impl BlockingRetryWithContext where B: Backoff, F: FnMut(Ctx) -> (Ctx, Result), { /// Create a new retry. fn new(f: F, backoff: B) -> Self { - BlockingRetry { + BlockingRetryWithContext { backoff, retryable: |_: &E| true, notify: |_: &E, _: Duration| {}, @@ -61,7 +61,7 @@ where } } -impl BlockingRetry +impl BlockingRetryWithContext where B: Backoff, F: FnMut(Ctx) -> (Ctx, Result), @@ -69,8 +69,8 @@ where NF: FnMut(&E, Duration), { /// Set the context for retrying. - pub fn context(self, context: Ctx) -> BlockingRetry { - BlockingRetry { + pub fn context(self, context: Ctx) -> BlockingRetryWithContext { + BlockingRetryWithContext { backoff: self.backoff, retryable: self.retryable, notify: self.notify, @@ -85,8 +85,8 @@ where pub fn when bool>( self, retryable: RN, - ) -> BlockingRetry { - BlockingRetry { + ) -> BlockingRetryWithContext { + BlockingRetryWithContext { backoff: self.backoff, retryable, notify: self.notify, @@ -101,8 +101,8 @@ where pub fn notify( self, notify: NN, - ) -> BlockingRetry { - BlockingRetry { + ) -> BlockingRetryWithContext { + BlockingRetryWithContext { backoff: self.backoff, retryable: self.retryable, notify, diff --git a/src/lib.rs b/src/lib.rs index 10685c6..a53c696 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,6 +181,14 @@ pub use retry::Retryable; mod retry_with_context; pub use retry_with_context::RetryableWithContext; +mod sleep; +pub use sleep::DefaultSleeper; +#[cfg(all(target_arch = "wasm32", feature = "gloo-timers-sleep"))] +pub use sleep::GlooTimersSleep; +pub use sleep::Sleeper; +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-sleep"))] +pub use sleep::TokioSleeper; + #[cfg(not(target_arch = "wasm32"))] mod blocking_retry; #[cfg(not(target_arch = "wasm32"))] diff --git a/src/retry.rs b/src/retry.rs index 31cfcfd..8257fb7 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -5,14 +5,10 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] -use tokio::time::{sleep, Sleep}; - -#[cfg(target_arch = "wasm32")] -use gloo_timers::future::{sleep, TimeoutFuture as Sleep}; - use crate::backoff::BackoffBuilder; use crate::Backoff; +use crate::DefaultSleeper; +use crate::Sleeper; /// Retryable will add retry support for functions that produces a futures with results. /// @@ -76,6 +72,7 @@ pub struct Retry< E, Fut: Future>, FutureFn: FnMut() -> Fut, + SF: Sleeper = DefaultSleeper, RF = fn(&E) -> bool, NF = fn(&E, Duration), > { @@ -83,8 +80,9 @@ pub struct Retry< retryable: RF, notify: NF, future_fn: FutureFn, + sleep_fn: SF, - state: State, + state: State, } impl Retry @@ -94,25 +92,70 @@ where FutureFn: FnMut() -> Fut, { /// Create a new retry. + /// + /// This API is only available when `tokio-sleep` feature is enabled. fn new(future_fn: FutureFn, backoff: B) -> Self { Retry { backoff, retryable: |_: &E| true, notify: |_: &E, _: Duration| {}, future_fn, + sleep_fn: DefaultSleeper::default(), state: State::Idle, } } } -impl Retry +impl Retry where B: Backoff, Fut: Future>, FutureFn: FnMut() -> Fut, + SF: Sleeper, RF: FnMut(&E) -> bool, NF: FnMut(&E, Duration), { + /// Set the sleeper for retrying. + /// + /// If not specified, we use the default sleeper that enabled by feature flag. + /// + /// The sleeper should implement the [`Sleeper`] trait. The simplest way is to use a closure that returns a `Future`. + /// + /// ```no_run + /// use anyhow::Result; + /// use backon::ExponentialBuilder; + /// use backon::Retryable; + /// use std::future::ready; + /// + /// async fn fetch() -> Result { + /// Ok(reqwest::get("https://www.rust-lang.org") + /// .await? + /// .text() + /// .await?) + /// } + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let content = fetch + /// .retry(&ExponentialBuilder::default()) + /// .sleep(|_| ready(())) + /// .await?; + /// println!("fetch succeeded: {}", content); + /// + /// Ok(()) + /// } + /// ``` + pub fn sleep(self, sleep_fn: SN) -> Retry { + Retry { + backoff: self.backoff, + retryable: self.retryable, + notify: self.notify, + future_fn: self.future_fn, + sleep_fn, + state: State::Idle, + } + } + /// Set the conditions for retrying. /// /// If not specified, we treat all errors as retryable. @@ -145,12 +188,13 @@ where pub fn when bool>( self, retryable: RN, - ) -> Retry { + ) -> Retry { Retry { backoff: self.backoff, retryable, notify: self.notify, future_fn: self.future_fn, + sleep_fn: self.sleep_fn, state: self.state, } } @@ -191,11 +235,12 @@ where pub fn notify( self, notify: NN, - ) -> Retry { + ) -> Retry { Retry { backoff: self.backoff, retryable: self.retryable, notify, + sleep_fn: self.sleep_fn, future_fn: self.future_fn, state: self.state, } @@ -209,19 +254,19 @@ where /// `tokio::time::Sleep` is a very struct that occupy 640B, so we wrap it /// into a `Pin>` to avoid this enum too large. #[derive(Default)] -enum State>> { +enum State>, SleepFut: Future> { #[default] Idle, Polling(Fut), - // TODO: we need to support other sleeper - Sleeping(Sleep), + Sleeping(SleepFut), } -impl Future for Retry +impl Future for Retry where B: Backoff, Fut: Future>, FutureFn: FnMut() -> Fut, + SF: Sleeper, RF: FnMut(&E) -> bool, NF: FnMut(&E, Duration), { @@ -259,7 +304,7 @@ where None => return Poll::Ready(Err(err)), Some(dur) => { (this.notify)(&err, dur); - this.state = State::Sleeping(sleep(dur)); + this.state = State::Sleeping(this.sleep_fn.sleep(dur)); continue; } } @@ -283,9 +328,9 @@ where } #[cfg(test)] +#[cfg(any(feature = "tokio-sleep", feature = "gloo-timers-sleep"))] mod tests { - use std::time::Duration; - + use std::{future::ready, time::Duration}; use tokio::sync::Mutex; #[cfg(target_arch = "wasm32")] @@ -312,6 +357,18 @@ mod tests { Ok(()) } + #[test] + async fn test_retry_with_sleep() -> anyhow::Result<()> { + let result = always_error + .retry(&ExponentialBuilder::default().with_min_delay(Duration::from_millis(1))) + .sleep(|_| ready(())) + .await; + + assert!(result.is_err()); + assert_eq!("test_query meets error", result.unwrap_err().to_string()); + Ok(()) + } + #[test] async fn test_retry_with_not_retryable_error() -> anyhow::Result<()> { let error_times = Mutex::new(0); diff --git a/src/retry_with_context.rs b/src/retry_with_context.rs index 3b6533c..40ec877 100644 --- a/src/retry_with_context.rs +++ b/src/retry_with_context.rs @@ -5,14 +5,10 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] -use tokio::time::{sleep, Sleep}; - -#[cfg(target_arch = "wasm32")] -use gloo_timers::future::{sleep, TimeoutFuture as Sleep}; - use crate::backoff::BackoffBuilder; use crate::Backoff; +use crate::DefaultSleeper; +use crate::Sleeper; /// RetryableWithContext will add retry support for functions that produces a futures with results /// and context. @@ -88,7 +84,7 @@ pub trait RetryableWithContext< > { /// Generate a new retry - fn retry(self, builder: &B) -> Retry; + fn retry(self, builder: &B) -> RetryWithContext; } impl RetryableWithContext for FutureFn @@ -97,19 +93,20 @@ where Fut: Future)>, FutureFn: FnMut(Ctx) -> Fut, { - fn retry(self, builder: &B) -> Retry { - Retry::new(self, builder.build()) + fn retry(self, builder: &B) -> RetryWithContext { + RetryWithContext::new(self, builder.build()) } } /// Retry struct generated by [`Retryable`]. -pub struct Retry< +pub struct RetryWithContext< B: Backoff, T, E, Ctx, Fut: Future)>, FutureFn: FnMut(Ctx) -> Fut, + SF: Sleeper = DefaultSleeper, RF = fn(&E) -> bool, NF = fn(&E, Duration), > { @@ -117,11 +114,12 @@ pub struct Retry< retryable: RF, notify: NF, future_fn: FutureFn, + sleep_fn: SF, - state: State, + state: State, } -impl Retry +impl RetryWithContext where B: Backoff, Fut: Future)>, @@ -133,31 +131,60 @@ where /// /// `context` must be set by `context` method before calling `await`. fn new(future_fn: FutureFn, backoff: B) -> Self { - Retry { + RetryWithContext { backoff, retryable: |_: &E| true, notify: |_: &E, _: Duration| {}, future_fn, + sleep_fn: DefaultSleeper::default(), state: State::Idle(None), } } } -impl Retry +impl + RetryWithContext where B: Backoff, Fut: Future)>, FutureFn: FnMut(Ctx) -> Fut, + SF: Sleeper, RF: FnMut(&E) -> bool, NF: FnMut(&E, Duration), { + /// Set the sleeper for retrying. + /// + /// If not specified, we use the default sleeper that enabled by feature flag. + pub fn sleep( + self, + sleep_fn: SN, + ) -> RetryWithContext { + assert!( + matches!(self.state, State::Idle(None)), + "sleep must be set before context" + ); + + RetryWithContext { + backoff: self.backoff, + retryable: self.retryable, + notify: self.notify, + future_fn: self.future_fn, + sleep_fn, + state: State::Idle(None), + } + } + /// Set the context for retrying. - pub fn context(self, context: Ctx) -> Retry { - Retry { + pub fn context( + self, + context: Ctx, + ) -> RetryWithContext { + RetryWithContext { backoff: self.backoff, retryable: self.retryable, notify: self.notify, future_fn: self.future_fn, + sleep_fn: self.sleep_fn, state: State::Idle(Some(context)), } } @@ -194,12 +221,13 @@ where pub fn when bool>( self, retryable: RN, - ) -> Retry { - Retry { + ) -> RetryWithContext { + RetryWithContext { backoff: self.backoff, retryable, notify: self.notify, future_fn: self.future_fn, + sleep_fn: self.sleep_fn, state: self.state, } } @@ -240,35 +268,33 @@ where pub fn notify( self, notify: NN, - ) -> Retry { - Retry { + ) -> RetryWithContext { + RetryWithContext { backoff: self.backoff, retryable: self.retryable, notify, future_fn: self.future_fn, + sleep_fn: self.sleep_fn, state: self.state, } } } /// State maintains internal state of retry. -/// -/// # Notes -/// -/// `tokio::time::Sleep` is a very struct that occupy 640B, so we wrap it -/// into a `Pin>` to avoid this enum too large. -enum State)>> { +enum State)>, SleepFut: Future> { Idle(Option), Polling(Fut), // TODO: we need to support other sleeper - Sleeping((Option, Sleep)), + Sleeping((Option, SleepFut)), } -impl Future for Retry +impl Future + for RetryWithContext where B: Backoff, Fut: Future)>, FutureFn: FnMut(Ctx) -> Fut, + SF: Sleeper, RF: FnMut(&E) -> bool, NF: FnMut(&E, Duration), { @@ -308,7 +334,8 @@ where None => return Poll::Ready((ctx, Err(err))), Some(dur) => { (this.notify)(&err, dur); - this.state = State::Sleeping((Some(ctx), sleep(dur))); + this.state = + State::Sleeping((Some(ctx), this.sleep_fn.sleep(dur))); continue; } } diff --git a/src/sleep.rs b/src/sleep.rs new file mode 100644 index 0000000..d694cb2 --- /dev/null +++ b/src/sleep.rs @@ -0,0 +1,71 @@ +use std::{ + future::{Future, Ready}, + time::Duration, +}; + +/// Sleeper is used to generate a future that resolves after a specified duration. +pub trait Sleeper { + /// The future returned by the `sleep` method. + type Sleep: Future; + + /// Generate a future that resolves after a specified duration. + fn sleep(&self, dur: Duration) -> Self::Sleep; +} + +/// The default implementation of `Sleeper` based on enabled feature flag. +#[cfg(all(not(feature = "tokio-sleep"), not(feature = "gloo-timers-sleep")))] +pub type DefaultSleeper = (); +/// The default implementation of `Sleeper` based on enabled feature flag. +/// +/// Under `tokio-sleep` feature, it uses `tokio::time::sleep`. +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-sleep"))] +pub type DefaultSleeper = TokioSleeper; +/// The default implementation of `Sleeper` based on enabled feature flag. +/// +/// Under `gloo-timers-sleep` feature, it uses `gloo_timers::sleep::sleep`. +#[cfg(all(target_arch = "wasm32", feature = "gloo-timers-sleep"))] +pub type DefaultSleeper = GlooTimersSleep; + +impl Sleeper for () { + type Sleep = Ready<()>; + + fn sleep(&self, _: Duration) -> Self::Sleep { + panic!("no sleeper has been configured, consider enabling features or provide a custom implementation") + } +} + +impl Fut, Fut: Future> Sleeper for F { + type Sleep = Fut; + + fn sleep(&self, dur: Duration) -> Self::Sleep { + self(dur) + } +} + +/// The default implementation of `Sleeper` using `tokio::time::sleep`. +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-sleep"))] +#[derive(Clone, Copy, Debug, Default)] +pub struct TokioSleeper; + +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-sleep"))] +impl Sleeper for TokioSleeper { + type Sleep = tokio::time::Sleep; + + fn sleep(&self, dur: Duration) -> Self::Sleep { + tokio::time::sleep(dur) + } +} + +/// The default implementation of `Sleeper` using `gloo_timers::future::sleep`. +#[cfg(all(target_arch = "wasm32", feature = "gloo-timers-sleep"))] +#[derive(Clone, Copy, Debug, Default)] +pub struct GlooTimersSleep; + +#[cfg(all(target_arch = "wasm32", feature = "gloo-timers-sleep"))] +impl Sleeper for GlooTimersSleep { + type Sleep = gloo_timers::future::TimeoutFuture; + + fn sleep(&self, dur: Duration) -> Self::Sleep { + gloo_timers::future::sleep(dur) + } +}