diff --git a/examples/multi_server.rs b/examples/multi_server.rs index a997076476..c3de1c2a50 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -4,6 +4,7 @@ extern crate futures; extern crate tokio_core; extern crate pretty_env_logger; +use futures::{Future, Stream}; use futures::future::FutureResult; use hyper::{Get, StatusCode}; @@ -14,10 +15,9 @@ use hyper::server::{Http, Service, Request, Response}; static INDEX1: &'static [u8] = b"The 1st service!"; static INDEX2: &'static [u8] = b"The 2nd service!"; -struct Service1; -struct Service2; +struct Srv(&'static [u8]); -impl Service for Service1 { +impl Service for Srv { type Request = Request; type Response = Response; type Error = hyper::Error; @@ -27,30 +27,8 @@ impl Service for Service1 { futures::future::ok(match (req.method(), req.path()) { (&Get, "/") => { Response::new() - .with_header(ContentLength(INDEX1.len() as u64)) - .with_body(INDEX1) - }, - _ => { - Response::new() - .with_status(StatusCode::NotFound) - } - }) - } - -} - -impl Service for Service2 { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = FutureResult; - - fn call(&self, req: Request) -> Self::Future { - futures::future::ok(match (req.method(), req.path()) { - (&Get, "/") => { - Response::new() - .with_header(ContentLength(INDEX2.len() as u64)) - .with_body(INDEX2) + .with_header(ContentLength(self.0.len() as u64)) + .with_body(self.0) }, _ => { Response::new() @@ -70,13 +48,23 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); - let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap(); - let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap(); + let srv1 = Http::new().serve_addr_handle(&addr1, &handle, || Ok(Srv(INDEX1))).unwrap(); + let srv2 = Http::new().serve_addr_handle(&addr2, &handle, || Ok(Srv(INDEX2))).unwrap(); + + println!("Listening on http://{}", srv1.incoming_ref().local_addr()); + println!("Listening on http://{}", srv2.incoming_ref().local_addr()); + + let handle1 = handle.clone(); + handle.spawn(srv1.for_each(move |conn| { + handle1.spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err))); + Ok(()) + }).map_err(|_| ())); - println!("Listening on http://{}", srv1.local_addr().unwrap()); - println!("Listening on http://{}", srv2.local_addr().unwrap()); + let handle2 = handle.clone(); + handle.spawn(srv2.for_each(move |conn| { + handle2.spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err))); + Ok(()) + }).map_err(|_| ())); - handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>())); - handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>())); core.run(futures::future::empty::<(), ()>()).unwrap(); } diff --git a/src/server/mod.rs b/src/server/mod.rs index a5d5eacc88..57614dda43 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,7 +17,7 @@ use std::rc::{Rc, Weak}; use std::time::Duration; use futures::task::{self, Task}; -use futures::future::{self, Select, Map}; +use futures::future::{self, Map}; use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; #[cfg(feature = "compat")] @@ -25,7 +25,7 @@ use http; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::{Core, Handle, Timeout}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use tokio_proto::BindServer; use tokio_proto::streaming::Message; use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto}; @@ -36,30 +36,11 @@ use proto::response; use proto::request; #[cfg(feature = "compat")] use proto::Body; +use self::hyper_service::HyperService; pub use proto::response::Response; pub use proto::request::Request; -// The `Server` can be created use its own `Core`, or an shared `Handle`. -enum Reactor { - // Own its `Core` - Core(Core), - // Share `Handle` with others - Handle(Handle), -} - -impl Reactor { - /// Returns a handle to the underlying event loop that this server will be - /// running on. - #[inline] - pub fn handle(&self) -> Handle { - match *self { - Reactor::Core(ref core) => core.handle(), - Reactor::Handle(ref handle) => handle.clone(), - } - } -} - /// An instance of the HTTP protocol, and implementation of tokio-proto's /// `ServerProto` trait. /// @@ -82,23 +63,62 @@ where B: Stream, { protocol: Http, new_service: S, - reactor: Reactor, + reactor: Core, listener: TcpListener, shutdown_timeout: Duration, no_proto: bool, } -/// The Future of an Server. -pub struct ServerFuture -where B: Stream, - B::Item: AsRef<[u8]>, +/// A stream mapping incoming IOs to new services. +/// +/// Yields `Connection`s that are futures that should be put on a reactor. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Serve { + incoming: I, + new_service: S, + protocol: Http, +} + +/* +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct SpawnAll { + executor: E, + serve: Serve, +} +*/ + +/// A stream of connections from binding to an address. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct AddrStream { + addr: SocketAddr, + listener: TcpListener, +} + +/// A future binding a connection with a Service. +/// +/// Polling this future will drive HTTP forward. +#[must_use = "futures do nothing unless polled"] +pub struct Connection +where + S: HyperService, + S::ResponseBody: Stream, + ::Item: AsRef<[u8]>, { - server: Server, - info: Rc>, - shutdown_signal: F, - shutdown: Option>, + conn: proto::dispatch::Dispatcher< + proto::dispatch::Server, + S::ResponseBody, + I, + ::Item, + proto::ServerTransaction, + proto::KA, + >, } +// ===== impl Http ===== + impl + 'static> Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. @@ -148,30 +168,7 @@ impl + 'static> Http { Ok(Server { new_service: new_service, - reactor: Reactor::Core(core), - listener: listener, - protocol: self.clone(), - shutdown_timeout: Duration::new(1, 0), - }) - } - - /// This method allows the ability to share a `Core` with multiple servers. - /// - /// Bind the provided `addr` and return a server with a shared `Core`. - /// - /// This is method will bind the `addr` provided with a new TCP listener ready - /// to accept connections. Each connection will be processed with the - /// `new_service` object provided as well, creating a new service per - /// connection. - pub fn bind_handle(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result> - where S: NewService, Error = ::Error> + 'static, - Bd: Stream, - { - let listener = TcpListener::bind(addr, &handle)?; - - Ok(Server { - new_service: new_service, - reactor: Reactor::Handle(handle.clone()), + reactor: core, listener: listener, protocol: self.clone(), shutdown_timeout: Duration::new(1, 0), @@ -179,6 +176,7 @@ impl + 'static> Http { }) } + /// Bind a `NewService` using types from the `http` crate. /// /// See `Http::bind`. @@ -220,29 +218,6 @@ impl + 'static> Http { }) } - /// Bind a connection together with a Service. - /// - /// This returns a Future that must be polled in order for HTTP to be - /// driven on the connection. - /// - /// This additionally skips the tokio-proto infrastructure internally. - pub fn no_proto(&self, io: I, service: S) -> Connection - where S: Service, Error = ::Error> + 'static, - Bd: Stream + 'static, - I: AsyncRead + AsyncWrite + 'static, - - { - let ka = if self.keep_alive { - proto::KA::Busy - } else { - proto::KA::Disabled - }; - let mut conn = proto::Conn::new(io, ka); - conn.set_flush_pipeline(self.pipeline); - Connection { - conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn), - } - } /// Bind a `Service` using types from the `http` crate. /// @@ -262,123 +237,70 @@ impl + 'static> Http { remote_addr: remote_addr, }) } -} -use self::hyper_service::HyperService; -mod hyper_service { - use super::{Request, Response, Service, Stream}; - /// A "trait alias" for any type that implements `Service` with hyper's - /// Request, Response, and Error types, and a streaming body. + /// This method allows the ability to share a `Core` with multiple servers. /// - /// There is an auto implementation inside hyper, so no one can actually - /// implement this trait. It simply exists to reduce the amount of generics - /// needed. - pub trait HyperService: Service + Sealed { - #[doc(hidden)] - type ResponseBody; - #[doc(hidden)] - type Sealed: Sealed2; + /// Bind the provided `addr` and return a server with a shared `Core`. + /// + /// This is method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided as well, creating a new service per + /// connection. + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> + where S: NewService, Error = ::Error>, + Bd: Stream, + { + let listener = TcpListener::bind(addr, &handle)?; + let incoming = AddrStream { + addr: listener.local_addr()?, + listener: listener, + }; + Ok(self.serve(incoming, new_service)) } - pub trait Sealed {} - pub trait Sealed2 {} - - #[allow(missing_debug_implementations)] - pub struct Opaque { - _inner: (), + //TODO: make public + fn serve(&self, incoming: I, new_service: S) -> Serve + where I: Stream, + I::Item: AsyncRead + AsyncWrite, + S: NewService, Error = ::Error>, + Bd: Stream, + { + Serve { + incoming: incoming, + new_service: new_service, + protocol: Http { + keep_alive: self.keep_alive, + pipeline: self.pipeline, + _marker: PhantomData, + }, + } } - impl Sealed2 for Opaque {} - - impl Sealed for S - where - S: Service< - Request=Request, - Response=Response, - Error=::Error, - >, - B: Stream, - B::Item: AsRef<[u8]>, - {} + /// Bind a connection together with a Service. + /// + /// This returns a Future that must be polled in order for HTTP to be + /// driven on the connection. + pub fn serve_connection(&self, io: I, service: S) -> Connection + where S: Service, Error = ::Error>, + Bd: Stream, + Bd::Item: AsRef<[u8]>, + I: AsyncRead + AsyncWrite, - impl HyperService for S - where - S: Service< - Request=Request, - Response=Response, - Error=::Error, - >, - S: Sealed, - B: Stream, - B::Item: AsRef<[u8]>, { - type ResponseBody = B; - type Sealed = Opaque; + let ka = if self.keep_alive { + proto::KA::Busy + } else { + proto::KA::Disabled + }; + let mut conn = proto::Conn::new(io, ka); + conn.set_flush_pipeline(self.pipeline); + Connection { + conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn), + } } - -} -/// A future binding a connection with a Service. -/// -/// Polling this future will drive HTTP forward. -#[must_use = "futures do nothing unless polled"] -pub struct Connection -where - S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, -{ - conn: proto::dispatch::Dispatcher, S::ResponseBody, I, ::Item, proto::ServerTransaction, proto::KA>, } -impl Future for Connection -where S: Service, Error = ::Error> + 'static, - I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, -{ - type Item = self::unnameable::Opaque; - type Error = ::Error; - fn poll(&mut self) -> Poll { - try_ready!(self.conn.poll()); - Ok(self::unnameable::opaque().into()) - } -} - -impl fmt::Debug for Connection -where - S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Connection") - .finish() - } -} - -mod unnameable { - // This type is specifically not exported outside the crate, - // so no one can actually name the type. With no methods, we make no - // promises about this type. - // - // All of that to say we can eventually replace the type returned - // to something else, and it would not be a breaking change. - // - // We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which - // doesn't have a `Debug` bound. So, this type can't implement `Debug` - // either, so the type change doesn't break people. - #[allow(missing_debug_implementations)] - pub struct Opaque { - _inner: (), - } - - pub fn opaque() -> Opaque { - Opaque { - _inner: (), - } - } -} impl Clone for Http { fn clone(&self) -> Http { @@ -584,6 +506,8 @@ impl Service for HttpService } } +// ===== impl Server ===== + impl Server where S: NewService, Error = ::Error> + 'static, B: Stream + 'static, @@ -619,21 +543,6 @@ impl Server self } - /// Configure the `shutdown_signal`. - pub fn shutdown_signal(self, signal: F) -> ServerFuture - where F: Future - { - ServerFuture { - server: self, - info: Rc::new(RefCell::new(Info { - active: 0, - blocker: None, - })), - shutdown_signal: signal, - shutdown: None, - } - } - /// Execute this server infinitely. /// /// This method does not currently return, but it will return an error if @@ -658,14 +567,9 @@ impl Server pub fn run_until(self, shutdown_signal: F) -> ::Result<()> where F: Future, { - let Server { protocol, new_service, reactor, listener, shutdown_timeout, no_proto } = self; + let Server { protocol, new_service, mut reactor, listener, shutdown_timeout, no_proto } = self; - let mut core = match reactor { - Reactor::Core(core) => core, - _ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"), - }; - - let handle = core.handle(); + let handle = reactor.handle(); // Mini future to track the number of active services let info = Rc::new(RefCell::new(Info { @@ -681,7 +585,7 @@ impl Server }; info.borrow_mut().active += 1; if no_proto { - let fut = protocol.no_proto(socket, s) + let fut = protocol.serve_connection(socket, s) .map(|_| ()) .map_err(|err| error!("no_proto error: {}", err)); handle.spawn(fut); @@ -702,7 +606,7 @@ impl Server // // When we get a shutdown signal (`Ok`) then we drop the TCP listener to // stop accepting incoming connections. - match core.run(shutdown_signal.select(srv)) { + match reactor.run(shutdown_signal.select(srv)) { Ok(((), _incoming)) => {} Err((e, _other)) => return Err(e.into()), } @@ -716,121 +620,184 @@ impl Server // here have been destroyed. let timeout = try!(Timeout::new(shutdown_timeout, &handle)); let wait = WaitUntilZero { info: info.clone() }; - match core.run(wait.select(timeout)) { + match reactor.run(wait.select(timeout)) { Ok(_) => Ok(()), Err((e, _)) => Err(e.into()) } } } -impl Future for Server - where S: NewService, Error = ::Error> + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, +impl> fmt::Debug for Server +where B::Item: AsRef<[u8]> { - type Item = (); - type Error = (); + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Server") + .field("reactor", &"...") + .field("listener", &self.listener) + .field("new_service", &self.new_service) + .field("protocol", &self.protocol) + .finish() + } +} - fn poll(&mut self) -> Poll { - if let Reactor::Core(_) = self.reactor { - panic!("Server owns its core, use `Server::run()` to run the service!") +// ===== impl Serve ===== + +impl Serve { + /* + /// Spawn all incoming connections onto the provide executor. + pub fn spawn_all(self, executor: E) -> SpawnAll { + SpawnAll { + executor: executor, + serve: self, } + } + */ - loop { - match self.listener.accept() { - Ok((socket, addr)) => { - // TODO: use the NotifyService - match self.new_service.new_service() { - Ok(srv) => self.protocol.bind_connection(&self.handle(), - socket, - addr, - srv), - Err(e) => debug!("internal error: {:?}", e), - } - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), - Err(e) => debug!("internal error: {:?}", e), - } + /// Get a reference to the incoming stream. + #[inline] + pub fn incoming_ref(&self) -> &I { + &self.incoming + } +} + +impl Stream for Serve +where + I: Stream, + I::Item: AsyncRead + AsyncWrite, + S: NewService, Error=::Error>, + B: Stream, + B::Item: AsRef<[u8]>, +{ + type Item = Connection; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(io) = try_ready!(self.incoming.poll()) { + let service = self.new_service.new_service()?; + Ok(Async::Ready(Some(self.protocol.serve_connection(io, service)))) + } else { + Ok(Async::Ready(None)) } } } -impl Future for ServerFuture - where F: Future, - S: NewService, Error = ::Error> + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, +// ===== impl SpawnAll ===== + +/* +impl Future for SpawnAll +where + I: Stream, + I::Item: AsyncRead + AsyncWrite, + S: NewService, Error=::Error>, + B: Stream, + B::Item: AsRef<[u8]>, + //E: Executor>, { type Item = (); - type Error = (); + type Error = ::Error; fn poll(&mut self) -> Poll { loop { - if let Some(ref mut shutdown) = self.shutdown { - match shutdown.poll() { - Ok(Async::Ready(_)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err((e, _)) => debug!("internal error: {:?}", e), - } - } else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() { - match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) { - Ok(timeout) => { - let wait = WaitUntilZero { info: self.info.clone() }; - self.shutdown = Some(wait.select(timeout)) - }, - Err(e) => debug!("internal error: {:?}", e), - } - } else { - match self.server.listener.accept() { - Ok((socket, addr)) => { - match self.server.new_service.new_service() { - Ok(inner_srv) => { - let srv = NotifyService { - inner: inner_srv, - info: Rc::downgrade(&self.info), - }; - self.info.borrow_mut().active += 1; - self.server.protocol.bind_connection(&self.server.handle(), - socket, - addr, - srv) - }, - Err(e) => debug!("internal error: {:?}", e), + if let Some(conn) = try_ready!(self.serve.poll()) { + let fut = conn + .map(|_| ()) + .map_err(|err| debug!("conn error: {}", err)); + match self.executor.execute(fut) { + Ok(()) => (), + Err(err) => match err.kind() { + ExecuteErrorKind::NoCapacity => { + debug!("SpawnAll::poll; executor no capacity"); + // continue loop + }, + ExecuteErrorKind::Shutdown | _ => { + debug!("SpawnAll::poll; executor shutdown"); + return Ok(Async::Ready(())) } - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), - Err(e) => debug!("internal error: {:?}", e), + } } + } else { + return Ok(Async::Ready(())) } } } } +*/ +// ===== impl Connection ===== -impl> fmt::Debug for Server -where B::Item: AsRef<[u8]> +impl Future for Connection +where S: Service, Error = ::Error> + 'static, + I: AsyncRead + AsyncWrite + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Server") - .field("reactor", &"...") - .field("listener", &self.listener) - .field("new_service", &self.new_service) - .field("protocol", &self.protocol) - .finish() + type Item = self::unnameable::Opaque; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + try_ready!(self.conn.poll()); + Ok(self::unnameable::opaque().into()) } } -impl > fmt::Debug for ServerFuture -where B::Item: AsRef<[u8]>, -F: Future +impl fmt::Debug for Connection +where + S: HyperService, + S::ResponseBody: Stream, + ::Item: AsRef<[u8]>, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ServerFuture") - .field("server", &self.server) - .field("info", &"...") - .field("shutdown_signal", &"...") - .field("shutdown", &"...") - .finish() + f.debug_struct("Connection") + .finish() + } +} + +mod unnameable { + // This type is specifically not exported outside the crate, + // so no one can actually name the type. With no methods, we make no + // promises about this type. + // + // All of that to say we can eventually replace the type returned + // to something else, and it would not be a breaking change. + // + // We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which + // doesn't have a `Debug` bound. So, this type can't implement `Debug` + // either, so the type change doesn't break people. + #[allow(missing_debug_implementations)] + pub struct Opaque { + _inner: (), + } + + pub fn opaque() -> Opaque { + Opaque { + _inner: (), + } + } +} + +// ===== impl AddrStream ===== + +impl AddrStream { + /// Get the local address bound to this listener. + pub fn local_addr(&self) -> SocketAddr { + self.addr + } +} + +impl Stream for AddrStream { + type Item = TcpStream; + type Error = ::std::io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match self.listener.accept() { + Ok((socket, _addr)) => { + return Ok(Async::Ready(Some(socket))); + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => debug!("internal error: {:?}", e), + } + } } } @@ -889,3 +856,55 @@ impl Future for WaitUntilZero { } } } + +mod hyper_service { + use super::{Request, Response, Service, Stream}; + /// A "trait alias" for any type that implements `Service` with hyper's + /// Request, Response, and Error types, and a streaming body. + /// + /// There is an auto implementation inside hyper, so no one can actually + /// implement this trait. It simply exists to reduce the amount of generics + /// needed. + pub trait HyperService: Service + Sealed { + #[doc(hidden)] + type ResponseBody; + #[doc(hidden)] + type Sealed: Sealed2; + } + + pub trait Sealed {} + pub trait Sealed2 {} + + #[allow(missing_debug_implementations)] + pub struct Opaque { + _inner: (), + } + + impl Sealed2 for Opaque {} + + impl Sealed for S + where + S: Service< + Request=Request, + Response=Response, + Error=::Error, + >, + B: Stream, + B::Item: AsRef<[u8]>, + {} + + impl HyperService for S + where + S: Service< + Request=Request, + Response=Response, + Error=::Error, + >, + S: Sealed, + B: Stream, + B::Item: AsRef<[u8]>, + { + type ResponseBody = B; + type Sealed = Opaque; + } +} diff --git a/tests/server.rs b/tests/server.rs index c5ec8a8606..c1da03accc 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -524,7 +524,7 @@ fn no_proto_empty_parse_eof_does_not_return_error() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let (socket, _) = item.unwrap(); - Http::new().no_proto(socket, HelloWorld) + Http::::new().serve_connection(socket, HelloWorld) }); core.run(fut).unwrap(); @@ -546,7 +546,7 @@ fn no_proto_nonempty_parse_eof_returns_error() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let (socket, _) = item.unwrap(); - Http::new().no_proto(socket, HelloWorld) + Http::::new().serve_connection(socket, HelloWorld) .map(|_| ()) });