Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): add poll_ready to Service and MakeService #1767

Merged
merged 1 commit into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ where
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
Expand Down Expand Up @@ -410,7 +410,11 @@ where
if self.in_flight.is_some() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
self.service.poll_ready()
.map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
}
}

Expand Down
42 changes: 30 additions & 12 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,37 @@ where
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
}
loop {
// At first, polls the readiness of supplied service.
match service.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
trace!("incoming connection complete");
return Ok(Async::Ready(()));
}
Err(err) => {
trace!("service closed");
return Err(::Error::new_user_service(err));
}
}

// no more incoming streams...
trace!("incoming connection complete");
Ok(Async::Ready(()))
// When the service is ready, accepts an incoming request.
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
} else {
// no more incoming streams...
trace!("incoming connection complete");
return Ok(Async::Ready(()))
}
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,15 @@ where
type Error = ::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.make_service.poll_ready_ref() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("make_service closed");
return Err(::Error::new_user_new_service(e));
}
}

if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let new_fut = self.make_service.make_service_ref(&io);
Ok(Async::Ready(Some(Connecting {
Expand Down
21 changes: 19 additions & 2 deletions src/service/make_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error as StdError;
use std::fmt;

use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};

use body::Payload;
use super::Service;
Expand Down Expand Up @@ -30,6 +30,15 @@ pub trait MakeService<Ctx> {
/// The error type that can be returned when creating a new `Service`.
type MakeError: Into<Box<StdError + Send + Sync>>;

/// Returns `Ready` when the constructor is ready to create a new `Service`.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the factory is not ready to create a new service. In this case, the future
/// returned from `make_service` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
Ok(Async::Ready(()))
}

/// Create a new `Service`.
fn make_service(&mut self, ctx: Ctx) -> Self::Future;
}
Expand All @@ -46,7 +55,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
ResBody=Self::ResBody,
Error=Self::Error,
>;
type Future: Future<Item=Self::Service>;
type MakeError: Into<Box<StdError + Send + Sync>>;
type Future: Future<Item=Self::Service, Error=Self::MakeError>;

// Acting like a #[non_exhaustive] for associated types of this trait.
//
Expand All @@ -59,6 +69,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
// if necessary.
type __DontNameMe: self::sealed::CantImpl;

fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError>;

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future;
}

Expand All @@ -76,10 +88,15 @@ where
type Service = S;
type ReqBody = IB;
type ResBody = OB;
type MakeError = ME;
type Future = F;

type __DontNameMe = self::sealed::CantName;

fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError> {
self.poll_ready()
}

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future {
self.make_service(ctx)
}
Expand Down
11 changes: 10 additions & 1 deletion src/service/new_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error as StdError;

use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};

use body::Payload;
use super::{MakeService, Service};
Expand Down Expand Up @@ -29,6 +29,11 @@ pub trait NewService {
/// The error type that can be returned when creating a new `Service`.
type InitError: Into<Box<StdError + Send + Sync>>;

#[doc(hidden)]
fn poll_ready(&mut self) -> Poll<(), Self::InitError> {
Ok(Async::Ready(()))
}

/// Create a new `Service`.
fn new_service(&self) -> Self::Future;
}
Expand Down Expand Up @@ -63,6 +68,10 @@ where
type Future = N::Future;
type MakeError = N::InitError;

fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
NewService::poll_ready(self)
}

fn make_service(&mut self, _: Ctx) -> Self::Future {
self.new_service()
}
Expand Down
11 changes: 10 additions & 1 deletion src/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;

use futures::{future, Future, IntoFuture};
use futures::{future, Async, Future, IntoFuture, Poll};

use body::Payload;
use common::Never;
Expand All @@ -26,6 +26,15 @@ pub trait Service {
/// The `Future` returned by this `Service`.
type Future: Future<Item=Response<Self::ResBody>, Error=Self::Error>;

/// Returns `Ready` when the service is able to process requests.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the service is not ready to process. In this case, the future returned
/// from `call` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not be doc(hidden), but include similar wording to what it says in tower-service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added doc comment to them.

Ok(Async::Ready(()))
}

/// Calls this `Service` with a request, returning a `Future` of the response.
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future;
}
Expand Down