Skip to content

Commit

Permalink
feat(server): add Server::run_threads to run on multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed May 5, 2018
1 parent eaa22cd commit 8b644c1
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ iovec = "0.1"
language-tags = "0.2"
log = "0.4"
mime = "0.3.2"
net2 = "0.2"
percent-encoding = "1.0"
relay = "0.1"
time = "0.1"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extern crate iovec;
extern crate language_tags;
#[macro_use] extern crate log;
pub extern crate mime;
extern crate net2;
#[macro_use] extern crate percent_encoding;
extern crate relay;
extern crate time;
Expand Down
89 changes: 87 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use futures::task::{self, Task};
use futures::future::{self};
use futures::{Future, Stream, Poll, Async};
use net2;

#[cfg(feature = "compat")]
use http;
Expand Down Expand Up @@ -60,7 +63,7 @@ pub struct Http<B = ::Chunk> {
keep_alive: bool,
pipeline: bool,
sleep_on_errors: bool,
_marker: PhantomData<B>,
_marker: PhantomData<fn() -> B>,
}

/// An instance of a server created through `Http::bind`.
Expand Down Expand Up @@ -178,7 +181,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(TcpListener::bind(addr, &handle));
let listener = try!(thread_listener(addr, &handle));

Ok(Server {
new_service: new_service,
Expand Down Expand Up @@ -445,6 +448,88 @@ impl<S, B> Server<S, B>
}
}


impl<S, B> Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + Send + Sync + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
/// Run the server on multiple threads.
#[cfg(unix)]
pub fn run_threads(self, threads: usize) {
assert!(threads > 0, "threads must be more than 0");

let Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
} = self;

let new_service = Arc::new(new_service);
let addr = listener.local_addr().unwrap();

let threads = (1..threads).map(|i| {
let protocol = protocol.clone();
let new_service = new_service.clone();
thread::Builder::new()
.name(format!("hyper-server-thread-{}", i))
.spawn(move || {
let reactor = Core::new().unwrap();
let listener = thread_listener(&addr, &reactor.handle()).unwrap();
let srv = Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
};
srv.run().unwrap();
})
.unwrap()
}).collect::<Vec<_>>();

let srv = Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
};
srv.run().unwrap();

for thread in threads {
thread.join().unwrap();
}
}
}

fn thread_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let listener = match *addr {
SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()),
SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()),
};
try!(reuse_port(&listener));
try!(listener.reuse_address(true));
try!(listener.bind(addr));
listener.listen(1024).and_then(|l| {
TcpListener::from_listener(l, addr, handle)
})
}

#[cfg(unix)]
fn reuse_port(tcp: &net2::TcpBuilder) -> io::Result<()> {
use net2::unix::*;
try!(tcp.reuse_port(true));
Ok(())
}

#[cfg(not(unix))]
fn reuse_port(_tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}

impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
where B::Item: AsRef<[u8]>
{
Expand Down

0 comments on commit 8b644c1

Please sign in to comment.