Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http server): add new builder APIs build_from_tcp and build_from_hyper #719

Merged
merged 16 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0")
.await
.unwrap();

let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion examples/cors_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let acl = AccessControlBuilder::new().allow_all_headers().allow_all_origins().allow_all_hosts().build();

let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0".parse::<SocketAddr>()?)?;
let server =
HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| {
Expand Down
2 changes: 1 addition & 1 deletion examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse::<SocketAddr>()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;

Expand Down
2 changes: 1 addition & 1 deletion examples/middleware_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0")?;
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
Expand Down
2 changes: 1 addition & 1 deletion http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ globset = "0.4"
lazy_static = "1.4"
tracing = "0.1"
serde_json = "1"
socket2 = "0.4"
tokio = { version = "1.8", features = ["rt-multi-thread", "macros"] }
unicase = "2.6.0"

[dev-dependencies]
env_logger = "0.9.0"
jsonrpsee-test-utils = { path = "../test-utils" }
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
socket2 = "0.4"
170 changes: 110 additions & 60 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use std::cmp;
use std::future::Future;
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -48,7 +48,7 @@ use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::ErrorCode;
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use tokio::net::{TcpListener, ToSocketAddrs};

/// Builder to create JSON-RPC HTTP server.
#[derive(Debug)]
Expand All @@ -57,7 +57,6 @@ pub struct Builder<M = ()> {
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
keep_alive: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
Expand All @@ -70,7 +69,6 @@ impl Default for Builder {
max_response_body_size: TEN_MB_SIZE_BYTES,
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
middleware: (),
}
Expand Down Expand Up @@ -116,7 +114,6 @@ impl<M> Builder<M> {
max_response_body_size: self.max_response_body_size,
resources: self.resources,
access_control: self.access_control,
keep_alive: self.keep_alive,
tokio_runtime: self.tokio_runtime,
middleware,
}
Expand All @@ -140,18 +137,10 @@ impl<M> Builder<M> {
self
}

/// Enables or disables HTTP keep-alive.
///
/// Default is true.
pub fn keep_alive(mut self, keep_alive: bool) -> Self {
self.keep_alive = keep_alive;
self
}

/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
/// See the module documentation for [`resurce_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
/// See the module documentation for [`resource_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
/// for details.
pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
self.resources.register(label, capacity, default)?;
Expand All @@ -167,6 +156,97 @@ impl<M> Builder<M> {
self
}

/// Finalizes the configuration of the server with customized TCP settings on the socket and on hyper.
///
/// ```rust
/// use jsonrpsee_http_server::HttpServerBuilder;
/// use socket2::{Domain, Socket, Type};
/// use std::net::TcpListener;
///
/// #[tokio::main]
/// async fn main() {
/// let addr = "127.0.0.1:0".parse().unwrap();
/// let domain = Domain::for_address(addr);
/// let socket = Socket::new(domain, Type::STREAM, None).unwrap();
/// socket.set_nonblocking(true).unwrap();
///
/// let address = addr.into();
/// socket.bind(&address).unwrap();
/// socket.listen(4096).unwrap();
///
/// let listener: TcpListener = socket.into();
/// let local_addr = listener.local_addr().ok();
///
/// // hyper does some settings on the provided socket, ensure that nothing breaks our "expected settings".
///
/// let listener = hyper::Server::from_tcp(listener)
/// .unwrap()
/// .tcp_sleep_on_accept_errors(true)
/// .tcp_keepalive(None)
/// .tcp_nodelay(true);
///
/// let server = HttpServerBuilder::new().build_from_hyper(listener, addr).unwrap();
/// }
/// ```
pub fn build_from_hyper(
self,
listener: hyper::server::Builder<AddrIncoming>,
local_addr: SocketAddr,
) -> Result<Server<M>, Error> {
Ok(Server {
listener,
local_addr: Some(local_addr),
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
})
}

/// Finalizes the configuration of the server with customized TCP settings on the socket.
/// Note, that [`hyper`] might overwrite some of the TCP settings on the socket
/// if you want full-control of socket settings use [`Builder::build_from_hyper`] instead.
///
/// ```rust
/// use jsonrpsee_http_server::HttpServerBuilder;
/// use socket2::{Domain, Socket, Type};
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let addr = "127.0.0.1:0".parse().unwrap();
/// let domain = Domain::for_address(addr);
/// let socket = Socket::new(domain, Type::STREAM, None).unwrap();
/// socket.set_nonblocking(true).unwrap();
///
/// let address = addr.into();
/// socket.bind(&address).unwrap();
///
/// socket.listen(4096).unwrap();
///
/// let server = HttpServerBuilder::new().build_from_tcp(socket).unwrap();
/// }
/// ```
pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>) -> Result<Server<M>, Error> {
let listener = listener.into();
let local_addr = listener.local_addr().ok();

let listener = hyper::Server::from_tcp(listener)?;

Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
})
}

/// Finalizes the configuration of the server.
///
/// ```rust
Expand All @@ -178,56 +258,26 @@ impl<M> Builder<M> {
/// occupied_addr,
/// "127.0.0.1:0".parse().unwrap(),
/// ];
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(occupied_addr).is_err());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).is_ok());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(occupied_addr).await.is_err());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).await.is_ok());
/// }
/// ```
pub fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
let mut err: Option<Error> = None;

for addr in addrs.to_socket_addrs()? {
let (listener, local_addr) = match self.inner_builder(addr) {
Ok(res) => res,
Err(e) => {
err = Some(e);
continue;
}
};

return Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
});
}

let err = err.unwrap_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "No address found").into());
Err(err)
}
pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
let listener = TcpListener::bind(addrs).await?.into_std()?;

fn inner_builder(
&self,
addr: SocketAddr,
) -> Result<(hyper::server::Builder<hyper::server::conn::AddrIncoming>, Option<SocketAddr>), Error> {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, None)?;
socket.set_nodelay(true)?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.set_keepalive(self.keep_alive)?;
let address = addr.into();
socket.bind(&address)?;

socket.listen(1024)?;
let listener: TcpListener = socket.into();
let local_addr = listener.local_addr().ok();
let listener = hyper::Server::from_tcp(listener)?;
Ok((listener, local_addr))
let listener = hyper::Server::from_tcp(listener)?.tcp_nodelay(true);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
})
}
}

Expand Down
6 changes: 3 additions & 3 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;

async fn server() -> (SocketAddr, ServerHandle) {
let server = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
let server = HttpServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
let addr = server.local_addr().unwrap();
Expand Down Expand Up @@ -410,7 +410,7 @@ async fn run_forever() {
async fn can_set_the_max_request_body_size() {
let addr = "127.0.0.1:0";
// Rejects all requests larger than 100 bytes
let server = HttpServerBuilder::default().max_request_body_size(100).build(addr).unwrap();
let server = HttpServerBuilder::default().max_request_body_size(100).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok("a".repeat(100))).unwrap();
let addr = server.local_addr().unwrap();
Expand All @@ -434,7 +434,7 @@ async fn can_set_the_max_request_body_size() {
async fn can_set_the_max_response_size() {
let addr = "127.0.0.1:0";
// Set the max response size to 100 bytes
let server = HttpServerBuilder::default().max_response_body_size(100).build(addr).unwrap();
let server = HttpServerBuilder::default().max_response_body_size(100).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok("a".repeat(101))).unwrap();
let addr = server.local_addr().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) {
}

pub async fn http_server_with_access_control(acl: AccessControl) -> (SocketAddr, HttpServerHandle) {
let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0").unwrap();
let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
let addr = server.local_addr().unwrap();
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ async fn http_server(module: RpcModule<()>, counter: Counter) -> Result<(SocketA
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.set_middleware(counter)
.build("127.0.0.1:0")?;
.build("127.0.0.1:0")
.await?;

let addr = server.local_addr()?;
let handle = server.start(module)?;
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ async fn multiple_blocking_calls_overlap() {

#[tokio::test]
async fn subscriptions_do_not_work_for_http_servers() {
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let addr = htserver.local_addr().unwrap();
let htserver_url = format!("http://{}", addr);
let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/resource_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
let server = HttpServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.build("127.0.0.1:0")?;
.build("127.0.0.1:0")
.await?;

let addr = server.local_addr()?;
let handle = server.start(module)?;
Expand Down