Skip to content

Commit

Permalink
feat(server): re-design Server as higher-level API
Browse files Browse the repository at this point in the history
The `hyper::Server` is now a proper higher-level API for running HTTP
servers. There is a related `hyper::server::Builder` type, to construct
a `Server`. All other types (`Http`, `Serve`, etc) were moved into the
"lower-level" `hyper::server::conn` module.

The `Server` is a `Future` representing a listening HTTP server. Options
needed to build one are set on the `Builder`.

As `Server` is just a `Future`, it no longer owns a thread-blocking
executor, and can thus be run next to other servers, clients, or
what-have-you.

Closes #1322
Closes #1263

BREAKING CHANGE: The `Server` is no longer created from `Http::bind`,
  nor is it `run`. It is a `Future` that must be polled by an
  `Executor`.

  The `hyper::server::Http` type has move to
  `hyper::server::conn::Http`.
  • Loading branch information
seanmonstar committed Apr 16, 2018
1 parent 35c38cb commit c497450
Show file tree
Hide file tree
Showing 16 changed files with 790 additions and 825 deletions.
18 changes: 12 additions & 6 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ use tokio::runtime::Runtime;
use tokio::net::TcpListener;

use hyper::{Body, Method, Request, Response};
use hyper::server::Http;
use hyper::client::HttpConnector;
use hyper::server::conn::Http;


#[bench]
fn get_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);

let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();

Expand All @@ -43,8 +46,10 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);

let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();

Expand All @@ -71,7 +76,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();

let http = Http::<hyper::Chunk>::new();
let http = Http::new();

let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
Expand All @@ -81,6 +86,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
})
}));

// Specifically only accept 1 connection.
let srv = listener.incoming()
.into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
Expand Down
22 changes: 13 additions & 9 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ use std::sync::mpsc;
use futures::{future, stream, Future, Stream};
use futures::sync::oneshot;

use hyper::{Body, Request, Response};
use hyper::{Body, Request, Response, Server};
use hyper::server::Service;

macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => ({
let _ = pretty_env_logger::try_init();
let (_until_tx, until_rx) = oneshot::channel();
let (_until_tx, until_rx) = oneshot::channel::<()>();
let addr = {
let (addr_tx, addr_rx) = mpsc::channel();
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let srv = hyper::server::Http::new().bind(&addr, || Ok(BenchPayload {
header: $header,
body: $body,
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
let srv = Server::bind(&addr)
.serve(|| Ok(BenchPayload {
header: $header,
body: $body,
}));
addr_tx.send(srv.local_addr()).unwrap();
let fut = srv
.map_err(|e| panic!("server error: {}", e))
.select(until_rx.then(|_| Ok(())))
.then(|_| Ok(()));
tokio::run(fut);
});

addr_rx.recv().unwrap()
Expand Down
18 changes: 8 additions & 10 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,27 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::future::lazy;

use hyper::{Body, Response};
use hyper::server::{Http, const_service, service_fn};
use hyper::server::{Server, const_service, service_fn};

static PHRASE: &'static [u8] = b"Hello World!";

fn main() {
pretty_env_logger::init();

let addr = ([127, 0, 0, 1], 3000).into();

let new_service = const_service(service_fn(|_| {
//TODO: when `!` is stable, replace error type
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}));

tokio::run(lazy(move || {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
let server = Server::bind(&addr)
.serve(new_service)
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
println!("Listening on http://{}", addr);

tokio::run(server);
}
30 changes: 14 additions & 16 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future, Stream};
use futures::{Future};
use futures::future::{FutureResult, lazy};

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

static INDEX1: &'static [u8] = b"The 1st service!";
static INDEX2: &'static [u8] = b"The 2nd service!";
Expand Down Expand Up @@ -40,25 +40,23 @@ impl Service for Srv {

fn main() {
pretty_env_logger::init();
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();

let addr1 = ([127, 0, 0, 1], 1337).into();
let addr2 = ([127, 0, 0, 1], 1338).into();

tokio::run(lazy(move || {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
let srv1 = Server::bind(&addr1)
.serve(|| Ok(Srv(INDEX1)))
.map_err(|e| eprintln!("server 1 error: {}", e));

println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
let srv2 = Server::bind(&addr2)
.serve(|| Ok(Srv(INDEX2)))
.map_err(|e| eprintln!("server 2 error: {}", e));

tokio::spawn(srv1.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
println!("Listening on http://{} and http://{}", addr1, addr2);

tokio::spawn(srv2.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
tokio::spawn(srv1);
tokio::spawn(srv2);

Ok(())
}));
Expand Down
16 changes: 8 additions & 8 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ extern crate tokio;
extern crate url;

use futures::{Future, Stream};
use futures::future::lazy;

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

use std::collections::HashMap;
use url::form_urlencoded;
Expand Down Expand Up @@ -96,11 +95,12 @@ impl Service for ParamExample {

fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();

let server = Server::bind(&addr)
.serve(|| Ok(ParamExample))
.map_err(|e| eprintln!("server error: {}", e));

tokio::run(server);
}
16 changes: 9 additions & 7 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future/*, Sink*/};
use futures::future::lazy;
use futures::sync::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

use std::fs::File;
use std::io::{self, copy/*, Read*/};
Expand Down Expand Up @@ -138,11 +137,14 @@ impl Service for ResponseExamples {

fn main() {
pretty_env_logger::init();

let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let server = Server::bind(&addr)
.serve(|| Ok(ResponseExamples))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

tokio::run(server);
}
19 changes: 11 additions & 8 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::future::{FutureResult, lazy};
use futures::future::{FutureResult};

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

static INDEX: &'static [u8] = b"Try POST /echo";

Expand Down Expand Up @@ -41,11 +41,14 @@ impl Service for Echo {

fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();

let server = Server::bind(&addr)
.serve(|| Ok(Echo))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

tokio::run(server);
}
14 changes: 8 additions & 6 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::future::lazy;

use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::client::HttpConnector;
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

#[allow(unused, deprecated)]
use std::ascii::AsciiExt;
Expand Down Expand Up @@ -75,15 +75,17 @@ impl Service for ResponseExamples {

fn main() {
pretty_env_logger::init();

let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let client = Client::new();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(client.clone()))).unwrap();
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
let server = Server::bind(&addr)
.serve(move || Ok(ResponseExamples(client.clone())))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

serve.map_err(|_| ()).for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err)))
})
server
}));
}
8 changes: 4 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ impl Error {
Error::new(Kind::Io, Some(cause.into()))
}

pub(crate) fn new_listen(err: io::Error) -> Error {
Error::new(Kind::Listen, Some(err.into()))
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Listen, Some(cause.into()))
}

pub(crate) fn new_accept(err: io::Error) -> Error {
Error::new(Kind::Accept, Some(Box::new(err)))
pub(crate) fn new_accept<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Accept, Some(cause.into()))
}

pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
Expand Down
15 changes: 13 additions & 2 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ use tokio_io::{AsyncRead, AsyncWrite};

use proto::{Http1Transaction, MessageHead};

const INIT_BUFFER_SIZE: usize = 8192;
pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;

/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;

/// The maximum number of distinct `Buf`s to hold in a list before requiring
/// a flush. Only affects when the buffer strategy is to queue buffers.
///
/// Note that a flush can happen before reaching the maximum. This simply
/// forces a flush if the queue gets this big.
const MAX_BUF_LIST_BUFFERS: usize = 16;

pub struct Buffered<T, B> {
Expand Down
Loading

0 comments on commit c497450

Please sign in to comment.