Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tower-util and tower to std::future #330

Merged
merged 4 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]

members = [
# "tower",
"tower",
# "tower-balance",
"tower-buffer",
"tower-discover",
Expand Down
2 changes: 1 addition & 1 deletion tower-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ futures-executor-preview = "0.3.0-alpha.18"
tokio-test = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"
tower-service = "0.3.0-alpha.1"
pin-utils = "0.1.0-alpha.4"
pin-project = "0.4.0-alpha.10"
18 changes: 9 additions & 9 deletions tower-test/src/mock/future.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Future types

use crate::mock::error::{self, Error};
use futures_util::ready;
use pin_project::pin_project;
use tokio_sync::oneshot;

use std::{
Expand All @@ -10,16 +12,16 @@ use std::{
};

/// Future of the `Mock` response.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
}

type Rx<T> = oneshot::Receiver<Result<T, Error>>;

impl<T> ResponseFuture<T> {
pin_utils::unsafe_pinned!(rx: Option<Rx<T>>);

pub(crate) fn new(rx: Rx<T>) -> ResponseFuture<T> {
ResponseFuture { rx: Some(rx) }
}
Expand All @@ -32,13 +34,11 @@ impl<T> ResponseFuture<T> {
impl<T> Future for ResponseFuture<T> {
type Output = Result<T, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.rx().as_pin_mut() {
Some(rx) => match rx.poll(cx) {
Poll::Ready(Ok(Ok(v))) => Poll::Ready(Ok(v)),
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
Poll::Ready(Err(_)) => Poll::Ready(Err(error::Closed::new().into())),
Poll::Pending => Poll::Pending,
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.project().rx.as_pin_mut() {
Some(rx) => match ready!(rx.poll(cx)) {
Ok(r) => Poll::Ready(r),
Err(_) => Poll::Ready(Err(error::Closed::new().into())),
},
None => Poll::Ready(Err(error::Closed::new().into())),
}
Expand Down
4 changes: 4 additions & 0 deletions tower-util/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.3.0-alpha.1

- Move to `std::future`

# 0.1.0 (April 26, 2019)

- Initial release
16 changes: 10 additions & 6 deletions tower-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-util/0.1.0"
documentation = "https://docs.rs/tower-util/0.3.0-alpha.1"
description = """
Utilities for working with `Service`.
"""
Expand All @@ -24,10 +24,14 @@ edition = "2018"

[dependencies]
tower-service = "=0.3.0-alpha.1"
# tower-layer = "0.1.0"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] }
futures-util-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"

[dev-dependencies]
futures = { version = "=0.3.0-alpha.18", package = "futures-preview"}
# tokio-mock-task = "0.1.1"
# tower = { version = "0.1.0", path = "../tower" }
# tower-test = { version = "0.1.0", path = "../tower-test" }
futures-util-preview = "0.3.0-alpha.18"
tokio-test = "0.2.0-alpha.1"
tokio = "0.2.0-alpha.1"
tower = { version = "0.3.0-alpha.1", path = "../tower" }
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
8 changes: 2 additions & 6 deletions tower-util/src/boxed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
//! # Examples
//!
//! ```
//! # extern crate futures;
//! # extern crate tower_service;
//! # extern crate tower_util;
//! # use futures::*;
//! # use futures::future::FutureResult;
//! use futures_util::future::ready;
//! # use tower_service::Service;
//! # use tower_util::{BoxService, service_fn};
//! // Respond to requests using a closure, but closures cannot be named...
//! # pub fn main() {
//! let svc = service_fn(|mut request: String| {
//! request.push_str(" response");
//! Ok(request)
//! ready(Ok(request))
//! });
//!
//! let service: BoxService<String, String, ()> = BoxService::new(svc);
Expand Down
20 changes: 12 additions & 8 deletions tower-util/src/boxed/sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use futures::{Future, Poll};
use tower_service::Service;

use std::fmt;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// A boxed `Service + Send` trait object.
///
Expand All @@ -18,7 +22,7 @@ pub struct BoxService<T, U, E> {
///
/// This type alias represents a boxed future that is `Send` and can be moved
/// across threads.
type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;
type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;

#[derive(Debug)]
struct Boxed<S> {
Expand All @@ -41,8 +45,8 @@ impl<T, U, E> Service<T> for BoxService<T, U, E> {
type Error = E;
type Future = BoxFuture<U, E>;

fn poll_ready(&mut self) -> Poll<(), E> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: T) -> BoxFuture<U, E> {
Expand All @@ -68,13 +72,13 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = Box<dyn Future<Item = S::Response, Error = S::Error> + Send>;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>> + Send>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
Box::new(self.inner.call(request))
Box::pin(self.inner.call(request))
}
}
20 changes: 12 additions & 8 deletions tower-util/src/boxed/unsync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use futures::{Future, Poll};
use tower_service::Service;

use std::fmt;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// A boxed `Service` trait object.
pub struct UnsyncBoxService<T, U, E> {
Expand All @@ -12,7 +16,7 @@ pub struct UnsyncBoxService<T, U, E> {
///
/// This type alias represents a boxed future that is *not* `Send` and must
/// remain on the current thread.
type UnsyncBoxFuture<T, E> = Box<dyn Future<Item = T, Error = E>>;
type UnsyncBoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>>>>;

#[derive(Debug)]
struct UnsyncBoxed<S> {
Expand All @@ -35,8 +39,8 @@ impl<T, U, E> Service<T> for UnsyncBoxService<T, U, E> {
type Error = E;
type Future = UnsyncBoxFuture<U, E>;

fn poll_ready(&mut self) -> Poll<(), E> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: T) -> UnsyncBoxFuture<U, E> {
Expand All @@ -62,13 +66,13 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = Box<dyn Future<Item = S::Response, Error = S::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
Box::new(self.inner.call(request))
Box::pin(self.inner.call(request))
}
}
90 changes: 56 additions & 34 deletions tower-util/src/call_all/common.rs
Original file line number Diff line number Diff line change
@@ -1,95 +1,117 @@
use super::Error;
use futures::{try_ready, Async, Future, Poll, Stream};
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;

/// TODO: Dox
#[pin_project]
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
service: Svc,
service: Option<Svc>,
#[pin]
stream: S,
queue: Q,
eof: bool,
}

pub(crate) trait Drive<T: Future> {
pub(crate) trait Drive<F: Future> {
fn is_empty(&self) -> bool;

fn push(&mut self, future: T);
fn push(&mut self, future: F);

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error>;
// NOTE: this implicitly requires Self: Unpin just like Service does
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
}

impl<Svc, S, Q> CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<Error>,
S: Stream,
S::Error: Into<Error>,
Q: Drive<Svc::Future>,
{
pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
CallAll {
service,
service: Some(service),
stream,
queue,
eof: false,
}
}

/// Extract the wrapped `Service`.
pub(crate) fn into_inner(self) -> Svc {
self.service
pub(crate) fn into_inner(mut self) -> Svc {
self.service.take().expect("Service already taken")
}

pub(crate) fn unordered(self) -> super::CallAllUnordered<Svc, S> {
/// Extract the wrapped `Service`.
pub(crate) fn take_service(mut self: Pin<&mut Self>) -> Svc {
self.project()
.service
.take()
.expect("Service already taken")
}

pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
assert!(self.queue.is_empty() && !self.eof);

super::CallAllUnordered::new(self.service, self.stream)
super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
}
}

impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<Error>,
Error: From<Svc::Error>,
S: Stream,
S::Error: Into<Error>,
Q: Drive<Svc::Future>,
{
type Item = Svc::Response;
type Error = Error;
type Item = Result<Svc::Response, Error>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let res = self.queue.poll().map_err(Into::into);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

loop {
// First, see if we have any responses to yield
if let Async::Ready(Some(rsp)) = res? {
return Ok(Async::Ready(Some(rsp)));
if let Poll::Ready(r) = this.queue.poll(cx) {
if let Some(rsp) = r.transpose()? {
return Poll::Ready(Some(Ok(rsp)));
}
}

// If there are no more requests coming, check if we're done
if self.eof {
if self.queue.is_empty() {
return Ok(Async::Ready(None));
if *this.eof {
if this.queue.is_empty() {
return Poll::Ready(None);
} else {
return Ok(Async::NotReady);
return Poll::Pending;
}
}

// Then, see that the service is ready for another request
try_ready!(self.service.poll_ready().map_err(Into::into));
let svc = this
.service
.as_mut()
.expect("Using CallAll after extracing inner Service");
ready!(svc.poll_ready(cx))?;

// If it is, gather the next request (if there is one)
match self.stream.poll().map_err(Into::into)? {
Async::Ready(Some(req)) => {
self.queue.push(self.service.call(req));
}
Async::Ready(None) => {
// We're all done once any outstanding requests have completed
self.eof = true;
}
Async::NotReady => {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(r) => match r {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
},
Poll::Pending => {
// TODO: We probably want to "release" the slot we reserved in Svc here.
// It may be a while until we get around to actually using it.
}
Expand Down
Loading