From f17139070382a239cacb0ae173e1a6241c35262b Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Wed, 6 Jan 2021 23:08:05 +0000 Subject: [PATCH] util: Add and_then combinator (#485) ## Motivation https://docs.rs/futures/0.3.8/futures/future/trait.TryFutureExt.html#method.and_then is a useful method on futures. Perhaps it'd be nice to replicate this for the `ServiceExt` API. Co-authored-by: Harry Barber Co-authored-by: Eliza Weisman --- tower/src/util/and_then.rs | 101 +++++++++++++++++++++++++++++++++++++ tower/src/util/mod.rs | 71 ++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 tower/src/util/and_then.rs diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs new file mode 100644 index 000000000..0cc892afd --- /dev/null +++ b/tower/src/util/and_then.rs @@ -0,0 +1,101 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::TryFuture; +use futures_util::{future, TryFutureExt}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`and_then`] combinator. +/// +/// [`and_then`]: crate::util::ServiceExt::and_then +#[derive(Clone, Debug)] +pub struct AndThen { + inner: S, + f: F, +} + +/// Response future from [`AndThen`] services. +/// +/// [`AndThen`]: crate::util::AndThen +#[pin_project::pin_project] +pub struct AndThenFuture( + #[pin] pub(crate) future::AndThen, F2, N>, +); + +impl std::fmt::Debug for AndThenFuture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("AndThenFuture") + .field(&format_args!("...")) + .finish() + } +} + +impl Future for AndThenFuture +where + future::AndThen, F2, N>: Future, +{ + type Output = , F2, N> as Future>::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().0.poll(cx) + } +} + +/// A [`Layer`] that produces a [`AndThen`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct AndThenLayer { + f: F, +} + +impl AndThen { + /// Creates a new `AndThen` service. + pub fn new(inner: S, f: F) -> Self { + AndThen { f, inner } + } +} + +impl Service for AndThen +where + S: Service, + S::Error: Into, + F: FnOnce(S::Response) -> Fut + Clone, + Fut: TryFuture, +{ + type Response = Fut::Ok; + type Error = Fut::Error; + type Future = AndThenFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, request: Request) -> Self::Future { + AndThenFuture(self.inner.call(request).err_into().and_then(self.f.clone())) + } +} + +impl AndThenLayer { + /// Creates a new [`AndThenLayer`] layer. + pub fn new(f: F) -> Self { + AndThenLayer { f } + } +} + +impl Layer for AndThenLayer +where + F: Clone, +{ + type Service = AndThen; + + fn layer(&self, inner: S) -> Self::Service { + AndThen { + f: self.f.clone(), + inner, + } + } +} diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 24dde2377..c19aca31d 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -1,5 +1,6 @@ //! Various utility types and functions that are generally with Tower. +mod and_then; mod boxed; mod call_all; mod either; @@ -17,6 +18,7 @@ mod service_fn; mod then; pub use self::{ + and_then::{AndThen, AndThenLayer}, boxed::{BoxService, UnsyncBoxService}, either::Either, future_service::{future_service, FutureService}, @@ -43,6 +45,7 @@ pub mod error { pub mod future { //! Future types + pub use super::and_then::AndThenFuture; pub use super::map_err::MapErrFuture; pub use super::map_response::MapResponseFuture; pub use super::map_result::MapResultFuture; @@ -90,6 +93,74 @@ pub trait ServiceExt: tower_service::Service { CallAll::new(self, reqs) } + /// Executes a new future after this service's after this services future resolves. This does + /// not alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. You can use this method to chain along a computation once the + /// services response has been resolved. + /// + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # async fn avatar_lookup(name: String) -> Result, u8> { Ok(vec![]) } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the response into a new response + /// let mut new_service = service.and_then(|record: Record| async move { + /// let name = record.name; + /// avatar_lookup(name).await + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let avatar = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + fn and_then(self, f: F) -> AndThen + where + Self: Sized, + F: Clone, + { + AndThen::new(self, f) + } + /// Maps this service's response value to a different value. This does not /// alter the behaviour of the [`poll_ready`] method. ///