diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs index b89bef98e..7ca9b9297 100644 --- a/tower/src/builder/mod.rs +++ b/tower/src/builder/mod.rs @@ -229,6 +229,24 @@ impl ServiceBuilder { self.layer(crate::util::MapErrLayer::new(f)) } + /// Apply a function after the service, regardless of whether the future + /// succeeds or fails. + /// + /// This is similar to the [`map_response`] and [`map_err] functions, + /// except that the *same* function is invoked when the service's future + /// completes, whether it completes successfully or fails. This function + /// takes the `Result` returned by the service's future, and returns a + /// `Result`. + /// + /// See the documentation for the [`then` combinator] for details. + /// + /// [`then` combinator]: crate::util::ServiceExt::then + #[cfg(feature = "util")] + pub fn then(self, f: F) -> ServiceBuilder, L>> { + self.layer(crate::util::ThenLayer::new(f)) + } + + /// Obtains the underlying `Layer` implementation. pub fn into_inner(self) -> L { self.layer diff --git a/tower/src/util/map_result.rs b/tower/src/util/map_result.rs new file mode 100644 index 000000000..088ae2880 --- /dev/null +++ b/tower/src/util/map_result.rs @@ -0,0 +1,73 @@ +use futures_util::FutureExt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use futures_util::future::Map as MapResultFuture; + +/// Service returned by the [`map_result`] combinator. +/// +/// [`map_result`]: crate::util::ServiceExt::map_result +#[derive(Clone, Debug)] +pub struct MapResult { + inner: S, + f: F, +} + +/// A [`Layer`] that produces a [`MapResult`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct MapResultLayer { + f: F, +} + +impl MapResult { + /// Creates a new `MapResult` service. + pub fn new(inner: S, f: F) -> Self { + MapResult { f, inner } + } +} + +impl Service for MapResult +where + S: Service, + Error: From, + F: FnOnce(Result) -> Result + Clone, +{ + type Response = Response; + type Error = Error; + type Future = MapResultFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).map(self.f.clone()) + } +} + +impl MapResultLayer { + /// Creates a new [`MapResultLayer`] layer. + pub fn new(f: F) -> Self { + MapResultLayer { f } + } +} + +impl Layer for MapResultLayer +where + F: Clone, +{ + type Service = MapResult; + + fn layer(&self, inner: S) -> Self::Service { + MapResult { + f: self.f.clone(), + inner, + } + } +} diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index f4696df4c..4b2c42a08 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -8,10 +8,13 @@ mod future_service; mod map_err; mod map_request; mod map_response; +mod map_result; + mod oneshot; mod optional; mod ready; mod service_fn; +mod then; mod try_map_request; pub use self::{ @@ -21,14 +24,17 @@ pub use self::{ map_err::{MapErr, MapErrLayer}, map_request::{MapRequest, MapRequestLayer}, map_response::{MapResponse, MapResponseLayer}, + map_result::{MapResult, MapResultLayer}, oneshot::Oneshot, optional::Optional, ready::{Ready, ReadyAnd, ReadyOneshot}, service_fn::{service_fn, ServiceFn}, + then::{Then, ThenLayer}, try_map_request::{TryMapRequest, TryMapRequestLayer}, }; pub use self::call_all::{CallAll, CallAllUnordered}; +use std::future::Future; pub mod error { //! Error types @@ -41,6 +47,8 @@ pub mod future { pub use super::map_err::MapErrFuture; pub use super::map_response::MapResponseFuture; + pub use super::map_result::MapResultFuture; + pub use super::then::ThenFuture; pub use super::optional::future as optional; } @@ -220,6 +228,222 @@ pub trait ServiceExt: tower_service::Service { MapErr::new(self, f) } + /// Maps this service's result type (`Result`) + /// to a different value, regardless of whether the future succeeds or + /// fails. + /// + /// This is similar to the [`map_response`] and [`map_err] combinators, + /// except that the *same* function is invoked when the service's future + /// completes, whether it completes successfully or fails. This function + /// takes the `Result` returned by the service's future, and returns a + /// `Result`. + /// + /// Like the standard library's [`Result::and_then`], this method can be + /// used to implement control flow based on `Result` values. For example, it + /// may be used to implement error recovery, by turning some `Err` + /// responses from the service into `Ok` responses. Similarly, some + /// successful responses from the service could be rejected, by returning an + /// `Err` conditionally, depending on the value inside the `Ok`. Finally, + /// this method can also be used to implement behaviors that must run when a + /// service's future completes, regardless of whether it succeeded or failed. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It can also be used to change the [`Error`] type + /// of the service. However, because the `map_result` function is not applied + /// to the errors returned by the service's [`poll_ready`] method, it must + /// be possible to convert the service's [`Error`] type into the error type + /// returned by the `map_result` function. This is trivial when the function + /// returns the same error type as the service, but in other cases, it can + /// be useful to use [`BoxError`] to erase differing error types. + /// + /// # Examples + /// + /// Recovering from certain errors: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # #[derive(Debug)] + /// # enum DbError { + /// # Parse(std::num::ParseIntError), + /// # NoRecordsFound, + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = Vec; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }])) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the database returns no records for the query, we just want an empty `Vec`. + /// let mut new_service = service.map_result(|result| match result { + /// // If the error indicates that no records matched the query, return an empty + /// // `Vec` instead. + /// Err(DbError::NoRecordsFound) => Ok(Vec::new()), + /// // Propagate all other responses (`Ok` and `Err`) unchanged + /// x => x, + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + /// + /// Rejecting some `Ok` responses: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # type DbError = String; + /// # type AppError = String; + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// use tower::BoxError; + /// + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the user is zero years old, return an error. + /// let mut new_service = service.map_result(|result| { + /// let record = result?; + /// + /// if record.age == 0 { + /// // Users must have been born to use our app! + /// let app_error = AppError::from("users cannot be 0 years old!"); + /// + /// // Box the error to erase its type (as it can be an `AppError` + /// // *or* the inner service's `DbError`). + /// return Err(BoxError::from(app_error)); + /// } + /// + /// // Otherwise, return the record. + /// Ok(record) + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let record = new_service + /// .ready_and() + /// .call(id) + /// .await + /// .unwrap(); + /// # }; + /// # } + /// ``` + /// + /// Performing an action that must be run for both successes and failures: + /// + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = String; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Print a message whenever a query completes. + /// let mut new_service = service.map_result(|result| { + /// println!("query completed; success={}", result.is_ok()); + /// result + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let response = new_service.ready_and().call(id).await; + /// # }; + /// # } + /// ``` + /// + /// [`Error`]: crate::Service::Error + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// [`BoxError`]: crate::BoxError + fn map_result(self, f: F) -> MapResult + where + Self: Sized, + Error: From, + F: FnOnce(Result) -> Result + Clone, + { + MapResult::new(self, f) + } + /// Composes a function *in front of* the service. /// /// This adapter produces a new service that passes each value through the @@ -332,6 +556,113 @@ pub trait ServiceExt: tower_service::Service { { TryMapRequest::new(self, f) } + + /// Composes an asynchronous function *after* this service. + /// + /// This takes a function or closure returning a future, and returns a new + /// `Service` that chains that function after this service's [`Future`]. The + /// new `Service`'s future will consist of this service's future, followed + /// by the future returned by calling the chained function with the future's + /// [`Output`] type. The chained function is called regardless of whether + /// this service's future completes with a successful response or with an + /// error. + /// + /// This method can be thought of as an equivalent to the [`futures` + /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that + /// _return_ futures, rather than on an individual future. Similarly to that + /// combinator, `ServiceExt::then` can be used to implement asynchronous + /// error recovery, by calling some asynchronous function with errors + /// returned by this service. Alternatively, it may also be used to call a + /// fallible async function with the successful response of this service. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It can also be used to change the [`Error`] type + /// of the service. However, because the `then` function is not applied + /// to the errors returned by the service's [`poll_ready`] method, it must + /// be possible to convert the service's [`Error`] type into the error type + /// returned by the `then` future. This is trivial when the function + /// returns the same error type as the service, but in other cases, it can + /// be useful to use [`BoxError`] to erase differing error types. + /// + /// # Examples + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # type Record = (); + /// # type DbError = (); + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(()))) + /// # } + /// # } + /// # + /// # fn main() { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // An async function that attempts to recover from errors returned by the + /// // database. + /// async fn recover_from_error(error: DbError) -> Result { + /// // ... + /// # Ok(()) + /// } + /// # async { + /// + /// // If the database service returns an error, attempt to recover by + /// // calling `recover_from_error`. Otherwise, return the successful response. + /// let mut new_service = service.then(|result| async move { + /// match result { + /// Ok(record) => Ok(record), + /// Err(e) => recover_from_error(e).await + /// } + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let record = new_service + /// .ready_and() + /// .call(id) + /// .await + /// .unwrap(); + /// # }; + /// # } + /// ``` + /// + /// [`Future`]: crate::Service::Future + /// [`Output`]: std::future::Future::Output + /// [`futures` crate]: https://docs.rs/futures + /// [`FuturesExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then + /// [`Error`]: crate::Service::Error + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// [`BoxError`]: crate::BoxError + fn then(self, f: F) -> Then + where + Self: Sized, + Error: From, + F: FnOnce(Result) -> Fut + Clone, + Fut: Future>, + { + Then::new(self, f) + } } impl ServiceExt for T where T: tower_service::Service {} diff --git a/tower/src/util/then.rs b/tower/src/util/then.rs new file mode 100644 index 000000000..a2f46ffde --- /dev/null +++ b/tower/src/util/then.rs @@ -0,0 +1,77 @@ +use futures_util::FutureExt; +use std::{ + future::Future, + task::{Context, Poll}, +}; +use tower_layer::Layer; +use tower_service::Service; + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use futures_util::future::Then as ThenFuture; + +/// Service returned by the [`then`] combinator. +/// +/// [`then`]: crate::util::ServiceExt::then +#[derive(Clone, Debug)] +pub struct Then { + inner: S, + f: F, +} + +/// A [`Layer`] that produces a [`Then`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct ThenLayer { + f: F, +} + +impl Then { + /// Creates a new `Then` service. + pub fn new(inner: S, f: F) -> Self { + Then { f, inner } + } +} + +impl Service for Then +where + S: Service, + S::Error: Into, + F: FnOnce(Result) -> Fut + Clone, + Fut: Future>, +{ + type Response = Response; + type Error = Error; + type Future = ThenFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).then(self.f.clone()) + } +} + +impl ThenLayer { + /// Creates a new [`ThenLayer`] layer. + pub fn new(f: F) -> Self { + ThenLayer { f } + } +} + +impl Layer for ThenLayer +where + F: Clone, +{ + type Service = Then; + + fn layer(&self, inner: S) -> Self::Service { + Then { + f: self.f.clone(), + inner, + } + } +}