Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[http server]: use tokio::spawn internally in HttpServer::start and return StopHandle #402

Merged
merged 15 commits into from
Oct 11, 2021
Merged
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ trait RequestBencher {

fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let (url, _handle) = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE);
Expand All @@ -91,7 +91,7 @@ trait RequestBencher {

fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let (url, _handle) = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE);
}
Expand Down
23 changes: 10 additions & 13 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures_channel::oneshot;
use jsonrpsee::{
http_server::HttpServerBuilder,
http_server::{HttpServerBuilder, HttpStopHandle},
ws_server::{RpcModule, WsServerBuilder},
};

Expand All @@ -10,18 +10,15 @@ pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";

/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start(module).await
});
format!("http://{}", server_started_rx.await.unwrap())
pub async fn http_server() -> (String, HttpStopHandle) {
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
let addr = server.local_addr().unwrap();
let handle = server.start(module).unwrap();
(format!("http://{}", addr), handle)
}

/// Run jsonrpsee WebSocket server for benchmarks.
Expand Down
10 changes: 5 additions & 5 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, RpcModule},
http_server::{HttpServerBuilder, HttpStopHandle, RpcModule},
rpc_params,
types::traits::Client,
};
Expand All @@ -36,7 +36,7 @@ use std::net::SocketAddr;
async fn main() -> anyhow::Result<()> {
env_logger::init();

let server_addr = run_server().await?;
let (server_addr, _handle) = run_server().await?;
let url = format!("http://{}", server_addr);

let client = HttpClientBuilder::default().build(url)?;
Expand All @@ -47,12 +47,12 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
async fn run_server() -> anyhow::Result<(SocketAddr, HttpStopHandle)> {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;

let addr = server.local_addr()?;
tokio::spawn(server.start(module));
Ok(addr)
let stop_handle = server.start(module)?;
Ok((addr, stop_handle))
}
13 changes: 8 additions & 5 deletions examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use jsonrpsee::{
proc_macros::rpc,
types::{async_trait, error::Error, Subscription},
ws_client::WsClientBuilder,
ws_server::{SubscriptionSink, WsServerBuilder},
ws_server::{SubscriptionSink, WsServerBuilder, WsStopHandle},
RpcModule,
};
use std::net::SocketAddr;

Expand Down Expand Up @@ -74,7 +75,7 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
async fn main() -> anyhow::Result<()> {
env_logger::init();

let server_addr = run_server().await?;
let (server_addr, _handle) = run_server().await?;
let url = format!("ws://{}", server_addr);

let client = WsClientBuilder::default().build(&url).await?;
Expand All @@ -87,10 +88,12 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
async fn run_server() -> anyhow::Result<(SocketAddr, WsStopHandle)> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("state_getPairs", |_, _| Ok(vec![1, 2, 3]))?;

let addr = server.local_addr()?;
server.start(RpcServerImpl.into_rpc())?;
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Ok(addr)
let handle = server.start(module)?;
Ok((addr, handle))
}
82 changes: 44 additions & 38 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
// DEALINGS IN THE SOFTWARE.

use crate::{response, AccessControl};
use futures_channel::mpsc;
use futures_channel::{mpsc, oneshot};
use futures_util::future::join_all;
use futures_util::{lock::Mutex, stream::StreamExt, SinkExt};
use futures_util::stream::StreamExt;
use hyper::{
server::{conn::AddrIncoming, Builder as HyperBuilder},
service::{make_service_fn, service_fn},
Expand All @@ -50,7 +50,6 @@ use socket2::{Domain, Socket, Type};
use std::{
cmp,
net::{SocketAddr, TcpListener},
sync::Arc,
};

/// Builder to create JSON-RPC HTTP server.
Expand All @@ -60,6 +59,8 @@ pub struct Builder {
resources: Resources,
max_request_body_size: u32,
keep_alive: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl Builder {
Expand Down Expand Up @@ -91,6 +92,13 @@ impl Builder {
Ok(self)
}

/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
///
/// Default: [`tokio::spawn`]
pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might want to re-export Handle but I decided not because the library is tightly-coupled to tokio and if the wrong tokio version is used a compile error is more likely than a runtime error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was thinking about adding something like this for ws for testing, but that ended ultimately not being necessary so I didn't bother. Should we add it there too?

Copy link
Member Author

@niklasad1 niklasad1 Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already added it to the WS server if that's ok? :)

self.tokio_runtime = Some(rt);
}

/// Finalizes the configuration of the server.
pub fn build(self, addr: SocketAddr) -> Result<Server, Error> {
let domain = Domain::for_address(addr);
Expand All @@ -108,15 +116,13 @@ impl Builder {

let listener = hyper::Server::from_tcp(listener)?;

let stop_pair = mpsc::channel(1);
Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
stop_pair,
stop_handle: Arc::new(Mutex::new(())),
resources: self.resources,
tokio_runtime: self.tokio_runtime,
})
}
}
Expand All @@ -128,26 +134,30 @@ impl Default for Builder {
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
}
}
}

/// Handle used to stop the running server.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct StopHandle {
stop_sender: mpsc::Sender<()>,
stop_handle: Arc<Mutex<()>>,
stop_sender: oneshot::Sender<()>,
stop_handle: Option<tokio::task::JoinHandle<()>>,
}

impl StopHandle {
/// Requests server to stop. Returns an error if server was already stopped.
pub async fn stop(&mut self) -> Result<(), Error> {
self.stop_sender.send(()).await.map_err(|_| Error::AlreadyStopped)
}

/// Blocks indefinitely until the server is stopped.
pub async fn wait_for_stop(&self) {
self.stop_handle.lock().await;
///
/// Returns a future that can be awaited for when the server shuts down.
pub fn stop(self) -> Result<tokio::task::JoinHandle<()>, Error> {
let sender = self.stop_sender;
let mut handle = self.stop_handle;
let stop = sender.send(()).and_then(|_| Ok(handle.take()));
match stop {
Ok(Some(handle)) => Ok(handle),
_ => Err(Error::AlreadyStopped),
}
}
}

Expand All @@ -162,12 +172,10 @@ pub struct Server {
max_request_body_size: u32,
/// Access control
access_control: AccessControl,
/// Pair of channels to stop the server.
stop_pair: (mpsc::Sender<()>, mpsc::Receiver<()>),
/// Stop handle that indicates whether server has been stopped.
stop_handle: Arc<Mutex<()>>,
/// Tracker for currently used resources on the server
resources: Resources,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl Server {
Expand All @@ -176,21 +184,13 @@ impl Server {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
}

/// Returns the handle to stop the running server.
pub fn stop_handle(&self) -> StopHandle {
StopHandle { stop_sender: self.stop_pair.0.clone(), stop_handle: self.stop_handle.clone() }
}

/// Start the server.
pub async fn start(self, methods: impl Into<Methods>) -> Result<(), Error> {
// Lock the stop mutex so existing stop handles can wait for server to stop.
// It will be unlocked once this function returns.
let _stop_handle = self.stop_handle.lock().await;

pub fn start(mut self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let (tx, rx) = oneshot::channel();
let listener = self.listener;
let resources = self.resources;
let mut stop_receiver = self.stop_pair.1;
let methods = methods.into().initialize_resources(&resources)?;

let make_service = make_service_fn(move |_| {
Expand Down Expand Up @@ -288,13 +288,19 @@ impl Server {
}
});

let server = self.listener.serve(make_service);
server
.with_graceful_shutdown(async move {
stop_receiver.next().await;
})
.await
.map_err(Into::into)
let rt = match self.tokio_runtime.take() {
Some(rt) => rt,
None => tokio::runtime::Handle::current(),
};

let handle = rt.spawn(async move {
let server = listener.serve(make_service);
server.with_graceful_shutdown(async move {
rx.await.ok();
});
});

Ok(StopHandle { stop_handle: Some(handle), stop_sender: tx })
}
}

Expand Down
Loading