From d2a0f29833500cd2651af0adaa36f1254b3693ed Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 28 Dec 2020 14:31:49 -0800 Subject: [PATCH] util: Add `then` combinator Currently, `ServiceExt` and `ServiceBuilder` provide combinators for mapping successful responses to other responses, and mapping errors to other errors, but don't provide a way to map between `Ok` and `Err` results. For completeness, this branch adds a new `then` combinator, which takes a function from `Result` to `Result` and applies it when the service's future completes. This can be used for recovering from some errors or for rejecting some `Ok` responses. It can also be used for behaviors that should be run when a service's future completes regardless of whether it completed successfully or not. Depends on #499 --- tower/src/builder/mod.rs | 17 ++++ tower/src/util/mod.rs | 211 +++++++++++++++++++++++++++++++++++++++ tower/src/util/then.rs | 72 +++++++++++++ 3 files changed, 300 insertions(+) create mode 100644 tower/src/util/then.rs diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs index dc9c63f5e..ec6e199b2 100644 --- a/tower/src/builder/mod.rs +++ b/tower/src/builder/mod.rs @@ -229,6 +229,23 @@ impl ServiceBuilder { self.layer(crate::util::MapErrLayer::new(f)) } + /// Apply a function after the service, regardless of whether the future + /// succeeds or fails. + /// + /// This is similar to the [`map_response`] and [`map_err] functions, + /// except that the *same* function is invoked when the service's future + /// completes, whether it completes successfully or fails. This function + /// takes the `Result` returned by the service's future, and returns a + /// `Result`. + /// + /// See the documentation for the [`then` combinator] for details. + /// + /// [`then` combinator]: crate::util::ServiceExt::then + #[cfg(feature = "util")] + pub fn then(self, f: F) -> ServiceBuilder, L>> { + self.layer(crate::util::ThenLayer::new(f)) + } + /// Obtains the underlying `Layer` implementation. pub fn into_inner(self) -> L { self.layer diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 37c8947c3..6bc6c16e3 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -10,6 +10,7 @@ mod oneshot; mod optional; mod ready; mod service_fn; +mod then; mod try_map_request; pub use self::{ @@ -22,6 +23,7 @@ pub use self::{ optional::Optional, ready::{Ready, ReadyAnd, ReadyOneshot}, service_fn::{service_fn, ServiceFn}, + then::{Then, ThenFuture, ThenLayer}, try_map_request::{TryMapRequest, TryMapRequestLayer}, }; @@ -327,6 +329,215 @@ pub trait ServiceExt: tower_service::Service { { TryMapRequest::new(self, f) } + + /// Composes a function after the service, regardless of whether the future + /// succeeds or fails. + /// + /// This is similar to the [`map_response`] and [`map_err] combinators, + /// except that the *same* function is invoked when the service's future + /// completes, whether it completes successfully or fails. This function + /// takes the `Result` returned by the service's future, and returns a + /// `Result`. + /// + /// Like the standard library's [`Result::and_then`], this method can be + /// used to implement control flow based on `Result` values. For example, it + /// may be used to implement error recovery, by turning some `Err` + /// responses from the service into `Ok` responses. Similarly, some + /// successful responses from the service could be rejected, by returning an + /// `Err` conditionally, depending on the value inside the `Ok`. Finally, + /// this method can also be used to implement behaviors that must run when a + /// service's future completes, regardless of whether it succeeded or failed. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It can also be used to change the [`Error`] type + /// of the service. However, because the `then` function is not applied + /// to the errors returned by the service's [`poll_ready`] method, it must + /// be possible to convert the service's [`Error`] type into the error type + /// returned by the `then` function. This is trivial when the function + /// returns the same error type as the service, but in other cases, it can + /// be useful to use [`BoxError`] to erase differing error types. + /// + /// # Examples + /// + /// Recovering from certain errors: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # enum DbError { + /// # Parse(std::num::ParseIntError) + /// # NoRecordsFound, + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }])) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the database returns no records for the query, we just want an empty `Vec`. + /// let mut new_service = service.then(|result| match result { + /// // If the error indicates that no records matched the query, return an empty + /// // `Vec` instead. + /// Err(DbError::NoRecordsFound) => Ok(Vec::new()), + /// // Propagate all other responses (`Ok` and `Err`) unchanged + /// x => x, + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + /// + /// Rejecting some `Ok` responses: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # type DbError = String; + /// # type AppError = String; + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// use tower::BoxError; + /// + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the user is zero years old, return an error. + /// let mut new_service = service.then(|result| { + /// let record = result?; + /// + /// if record.age == 0 { + /// // Users must have been born to use our app! + /// let app_error = AppError::from("users cannot be 0 years old!"); + /// + /// // Box the error to erase its type (as it can be an `AppError` + /// // *or* the inner service's `DbError`). + /// return Err(BoxError::from(app_error)); + /// } + /// + /// // Otherwise, return the record. + /// Ok(records) + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + /// + /// Performing an action that must be run for both successes and failures: + /// + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = u8; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: String) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Print a message whenever a query completes. + /// let mut new_service = service.then(|result| { + /// println!("query completed; success={}", result.is_ok()); + /// result + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let response = new_service.call(id).await; + /// # }; + /// # } + /// ``` + /// + /// [`Error`]: crate::Service::Error + /// [`poll_ready`]: crate::Service::poll_ready + /// [`BoxError`]: crate::BoxError + fn then(self, f: F) -> Then + where + Self: Sized, + Error: From, + F: FnOnce(Result) -> Result + Clone, + { + Then::new(self, f) + } } impl ServiceExt for T where T: tower_service::Service {} diff --git a/tower/src/util/then.rs b/tower/src/util/then.rs new file mode 100644 index 000000000..a45d74b51 --- /dev/null +++ b/tower/src/util/then.rs @@ -0,0 +1,72 @@ +use futures_util::FutureExt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +pub use futures_util::future::Map as ThenFuture; + +/// Service returned by the [`then`] combinator. +/// +/// [`then`]: crate::util::ServiceExt::then +#[derive(Clone, Debug)] +pub struct Then { + inner: S, + f: F, +} + +/// A [`Layer`] that produces a [`Then`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct ThenLayer { + f: F, +} + +impl Then { + /// Creates a new `Then` service. + pub fn new(inner: S, f: F) -> Self { + Then { f, inner } + } +} + +impl Service for Then +where + S: Service, + Error: From, + F: FnOnce(Result) -> Result + Clone, +{ + type Response = Response; + type Error = Error; + type Future = ThenFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).map(self.f.clone()) + } +} + +impl ThenLayer { + /// Creates a new [`ThenLayer`] layer. + pub fn new(f: F) -> Self { + ThenLayer { f } + } +} + +impl Layer for ThenLayer +where + F: Clone, +{ + type Service = Then; + + fn layer(&self, inner: S) -> Self::Service { + Then { + f: self.f.clone(), + inner, + } + } +}