From b19cd795ebdc460d29697380c72fb98ea570dfe7 Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Mon, 23 Nov 2020 11:11:26 +0000 Subject: [PATCH 1/7] Add and_then --- tower/src/util/and_then.rs | 81 ++++++++++++++++++++++++++++++++++++++ tower/src/util/mod.rs | 9 +++++ 2 files changed, 90 insertions(+) create mode 100644 tower/src/util/and_then.rs diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs new file mode 100644 index 000000000..9215fa085 --- /dev/null +++ b/tower/src/util/and_then.rs @@ -0,0 +1,81 @@ +use std::marker::PhantomData; +use std::task::{Context, Poll}; + +use futures_core::TryFuture; +use futures_util::{future::AndThen as AndThenFut, TryFutureExt}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`and_then`] combinator. +/// +/// [`and_then`]: crate::util::ServiceExt::and_then +#[derive(Clone, Debug)] +pub struct AndThen { + inner: S, + f: F, + _fut: PhantomData, +} + +impl AndThen { + /// Creates a new `AndThen` service. + pub fn new(inner: S, f: F) -> Self { + AndThen { + f, + inner, + _fut: Default::default(), + } + } +} + +impl Service for AndThen +where + S: Service, + F: FnOnce(S::Response) -> Fut + Clone, + Fut: TryFuture, +{ + type Response = Fut::Ok; + type Error = Fut::Error; + type Future = AndThenFut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).and_then(self.f.clone()) + } +} + +/// A [`Layer`] that produces a [`AndThen`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct AndThenLayer { + f: F, + _fut: PhantomData, +} + +impl AndThenLayer { + /// Creates a new [`AndThenLayer`] layer. + pub fn new(f: F) -> Self { + AndThenLayer { + f, + _fut: Default::default(), + } + } +} + +impl Layer for AndThenLayer +where + F: Clone, +{ + type Service = AndThen; + + fn layer(&self, inner: S) -> Self::Service { + AndThen { + f: self.f.clone(), + inner, + _fut: Default::default(), + } + } +} diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 24dde2377..322f77346 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -1,5 +1,6 @@ //! Various utility types and functions that are generally with Tower. +mod and_then; mod boxed; mod call_all; mod either; @@ -17,6 +18,7 @@ mod service_fn; mod then; pub use self::{ + and_then::{AndThen, AndThenLayer}, boxed::{BoxService, UnsyncBoxService}, either::Either, future_service::{future_service, FutureService}, @@ -89,6 +91,13 @@ pub trait ServiceExt: tower_service::Service { { CallAll::new(self, reqs) } + + fn and_then(self, f: F) -> AndThen + where + Self: Sized, + F: FnOnce(Self::Response) -> Fut + Clone { + AndThen::new(self, f) + } /// Maps this service's response value to a different value. This does not /// alter the behaviour of the [`poll_ready`] method. From 0b108ca41e3db2d8441c9299822f64e9dbd32c0e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 24 Nov 2020 13:55:43 -0800 Subject: [PATCH 2/7] remove future type param (#1) --- tower/src/util/and_then.rs | 29 +++++++++-------------------- tower/src/util/mod.rs | 11 ++++++----- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index 9215fa085..bc99f096d 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::task::{Context, Poll}; use futures_core::TryFuture; @@ -10,24 +9,19 @@ use tower_service::Service; /// /// [`and_then`]: crate::util::ServiceExt::and_then #[derive(Clone, Debug)] -pub struct AndThen { +pub struct AndThen { inner: S, f: F, - _fut: PhantomData, } -impl AndThen { +impl AndThen { /// Creates a new `AndThen` service. pub fn new(inner: S, f: F) -> Self { - AndThen { - f, - inner, - _fut: Default::default(), - } + AndThen { f, inner } } } -impl Service for AndThen +impl Service for AndThen where S: Service, F: FnOnce(S::Response) -> Fut + Clone, @@ -50,32 +44,27 @@ where /// /// [`Layer`]: tower_layer::Layer #[derive(Debug)] -pub struct AndThenLayer { +pub struct AndThenLayer { f: F, - _fut: PhantomData, } -impl AndThenLayer { +impl AndThenLayer { /// Creates a new [`AndThenLayer`] layer. pub fn new(f: F) -> Self { - AndThenLayer { - f, - _fut: Default::default(), - } + AndThenLayer { f } } } -impl Layer for AndThenLayer +impl Layer for AndThenLayer where F: Clone, { - type Service = AndThen; + type Service = AndThen; fn layer(&self, inner: S) -> Self::Service { AndThen { f: self.f.clone(), inner, - _fut: Default::default(), } } } diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 322f77346..f5101b8f2 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -91,12 +91,13 @@ pub trait ServiceExt: tower_service::Service { { CallAll::new(self, reqs) } - - fn and_then(self, f: F) -> AndThen + + fn and_then(self, f: F) -> AndThen where - Self: Sized, - F: FnOnce(Self::Response) -> Fut + Clone { - AndThen::new(self, f) + Self: Sized, + F: Clone, + { + AndThen::new(self, f) } /// Maps this service's response value to a different value. This does not From cfd520d87653db0a84f261434b2003c2b5a88174 Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Fri, 27 Nov 2020 15:47:12 +0000 Subject: [PATCH 3/7] Add documentation --- tower/src/util/mod.rs | 60 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index f5101b8f2..828fa22c5 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -92,6 +92,66 @@ pub trait ServiceExt: tower_service::Service { CallAll::new(self, reqs) } + /// Executes a new future after this service's after this services future resolves. This does + /// not alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. You can use this method to chain along a computation once the + /// services response has been resolved. + /// + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # async fn avatar_lookup(name: String) -> Result, u8> { Ok(vec![]) } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the response into a new response + /// let mut new_service = service.and_then(|record: Record| async move { + /// let name = record.name; + /// avatar_lookup(name).await + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let avatar = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` fn and_then(self, f: F) -> AndThen where Self: Sized, From 6abc004ba08f0416725b4ba07d24851e7d6744be Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Wed, 6 Jan 2021 20:30:21 +0000 Subject: [PATCH 4/7] Make and_then Into::into errors --- tower/src/util/and_then.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index bc99f096d..b93d4b5c8 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -1,7 +1,10 @@ use std::task::{Context, Poll}; use futures_core::TryFuture; -use futures_util::{future::AndThen as AndThenFut, TryFutureExt}; +use futures_util::{ + future::{AndThen as AndThenFut, ErrInto as ErrIntoFut}, + TryFutureExt, +}; use tower_layer::Layer; use tower_service::Service; @@ -21,22 +24,23 @@ impl AndThen { } } -impl Service for AndThen +impl Service for AndThen where S: Service, + S::Error: Into, F: FnOnce(S::Response) -> Fut + Clone, - Fut: TryFuture, + Fut: TryFuture, { type Response = Fut::Ok; - type Error = Fut::Error; - type Future = AndThenFut; + type Error = Error; + type Future = AndThenFut, Fut, F>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { - self.inner.call(request).and_then(self.f.clone()) + self.inner.call(request).err_into().and_then(self.f.clone()) } } From 143c8a662e483a5c61459348fd3600931f106bdd Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Wed, 6 Jan 2021 20:44:04 +0000 Subject: [PATCH 5/7] Remove unnecessary parameter --- tower/src/util/and_then.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index b93d4b5c8..f40f650bd 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -24,16 +24,16 @@ impl AndThen { } } -impl Service for AndThen +impl Service for AndThen where S: Service, - S::Error: Into, + S::Error: Into, F: FnOnce(S::Response) -> Fut + Clone, - Fut: TryFuture, + Fut: TryFuture, { type Response = Fut::Ok; - type Error = Error; - type Future = AndThenFut, Fut, F>; + type Error = Fut::Error; + type Future = AndThenFut, Fut, F>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) From 71b893a4980f669e6c1615bf97df9c89e66fb500 Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Wed, 6 Jan 2021 20:50:10 +0000 Subject: [PATCH 6/7] Reorder code blocks to match other combinators --- tower/src/util/and_then.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index f40f650bd..a2aa89325 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -17,6 +17,14 @@ pub struct AndThen { f: F, } +/// A [`Layer`] that produces a [`AndThen`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct AndThenLayer { + f: F, +} + impl AndThen { /// Creates a new `AndThen` service. pub fn new(inner: S, f: F) -> Self { @@ -44,14 +52,6 @@ where } } -/// A [`Layer`] that produces a [`AndThen`] service. -/// -/// [`Layer`]: tower_layer::Layer -#[derive(Debug)] -pub struct AndThenLayer { - f: F, -} - impl AndThenLayer { /// Creates a new [`AndThenLayer`] layer. pub fn new(f: F) -> Self { From 8c712069f6854cdbfa895f7860f783bb046ddb28 Mon Sep 17 00:00:00 2001 From: Harry Barber Date: Wed, 6 Jan 2021 22:05:39 +0000 Subject: [PATCH 7/7] Newtype the AndThen future --- tower/src/util/and_then.rs | 39 ++++++++++++++++++++++++++++++++------ tower/src/util/mod.rs | 1 + 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index a2aa89325..0cc892afd 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -1,10 +1,9 @@ +use std::future::Future; +use std::pin::Pin; use std::task::{Context, Poll}; use futures_core::TryFuture; -use futures_util::{ - future::{AndThen as AndThenFut, ErrInto as ErrIntoFut}, - TryFutureExt, -}; +use futures_util::{future, TryFutureExt}; use tower_layer::Layer; use tower_service::Service; @@ -17,6 +16,34 @@ pub struct AndThen { f: F, } +/// Response future from [`AndThen`] services. +/// +/// [`AndThen`]: crate::util::AndThen +#[pin_project::pin_project] +pub struct AndThenFuture( + #[pin] pub(crate) future::AndThen, F2, N>, +); + +impl std::fmt::Debug for AndThenFuture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("AndThenFuture") + .field(&format_args!("...")) + .finish() + } +} + +impl Future for AndThenFuture +where + future::AndThen, F2, N>: Future, +{ + type Output = , F2, N> as Future>::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().0.poll(cx) + } +} + /// A [`Layer`] that produces a [`AndThen`] service. /// /// [`Layer`]: tower_layer::Layer @@ -41,14 +68,14 @@ where { type Response = Fut::Ok; type Error = Fut::Error; - type Future = AndThenFut, Fut, F>; + type Future = AndThenFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { - self.inner.call(request).err_into().and_then(self.f.clone()) + AndThenFuture(self.inner.call(request).err_into().and_then(self.f.clone())) } } diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 828fa22c5..c19aca31d 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -45,6 +45,7 @@ pub mod error { pub mod future { //! Future types + pub use super::and_then::AndThenFuture; pub use super::map_err::MapErrFuture; pub use super::map_response::MapResponseFuture; pub use super::map_result::MapResultFuture;