Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http server): add new builder APIs build_from_tcp and build_from_hyper #719

Merged
merged 16 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0")
.await
.unwrap();

let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion examples/cors_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let acl = AccessControlBuilder::new().allow_all_headers().allow_all_origins().allow_all_hosts().build();

let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0".parse::<SocketAddr>()?)?;
let server =
HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| {
Expand Down
2 changes: 1 addition & 1 deletion examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse::<SocketAddr>()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;

Expand Down
2 changes: 1 addition & 1 deletion examples/middleware_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0")?;
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
Expand Down
2 changes: 1 addition & 1 deletion http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ globset = "0.4"
lazy_static = "1.4"
tracing = "0.1"
serde_json = "1"
socket2 = "0.4"
tokio = { version = "1.8", features = ["rt-multi-thread", "macros"] }
unicase = "2.6.0"

[dev-dependencies]
env_logger = "0.9.0"
jsonrpsee-test-utils = { path = "../test-utils" }
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
socket2 = "0.4"
4 changes: 3 additions & 1 deletion http-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ pub use access_control::{
};
pub use jsonrpsee_core::server::rpc_module::RpcModule;
pub use jsonrpsee_types as types;
pub use server::{Builder as HttpServerBuilder, Server as HttpServer, ServerHandle as HttpServerHandle};
pub use server::{
Builder as HttpServerBuilder, HyperTcpConfig, Server as HttpServer, ServerHandle as HttpServerHandle,
};
pub use tracing;

#[cfg(test)]
Expand Down
146 changes: 87 additions & 59 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

use std::cmp;
use std::future::Future;
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::response::{internal_error, malformed};
use crate::{response, AccessControl};
Expand All @@ -48,7 +49,7 @@ use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::ErrorCode;
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use tokio::net::{TcpListener, ToSocketAddrs};

/// Builder to create JSON-RPC HTTP server.
#[derive(Debug)]
Expand All @@ -57,7 +58,6 @@ pub struct Builder<M = ()> {
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
keep_alive: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
Expand All @@ -70,7 +70,6 @@ impl Default for Builder {
max_response_body_size: TEN_MB_SIZE_BYTES,
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
middleware: (),
}
Expand Down Expand Up @@ -116,7 +115,6 @@ impl<M> Builder<M> {
max_response_body_size: self.max_response_body_size,
resources: self.resources,
access_control: self.access_control,
keep_alive: self.keep_alive,
tokio_runtime: self.tokio_runtime,
middleware,
}
Expand All @@ -140,18 +138,10 @@ impl<M> Builder<M> {
self
}

/// Enables or disables HTTP keep-alive.
///
/// Default is true.
pub fn keep_alive(mut self, keep_alive: bool) -> Self {
self.keep_alive = keep_alive;
self
}

/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
/// See the module documentation for [`resurce_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
/// See the module documentation for [`resource_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
/// for details.
pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
self.resources.register(label, capacity, default)?;
Expand All @@ -167,6 +157,56 @@ impl<M> Builder<M> {
self
}

/// Finalizes the configuration of the server with customized TCP settings on the socket.
///
///
/// ```rust
/// use jsonrpsee_http_server::{HttpServerBuilder, HyperTcpConfig};
/// use socket2::{Domain, Socket, Type};
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let addr = "127.0.0.1:0".parse().unwrap();
/// let domain = Domain::for_address(addr);
/// let socket = Socket::new(domain, Type::STREAM, None).unwrap();
/// socket.set_nodelay(true).unwrap();
/// socket.set_reuse_address(true).unwrap();
/// socket.set_nonblocking(true).unwrap();
/// socket.set_keepalive(true).unwrap();
///
/// let address = addr.into();
/// socket.bind(&address).unwrap();
///
/// socket.listen(4096).unwrap();
///
/// // hyper does some settings on the provided socket, ensure that nothing breaks our "expected settings".
/// let hyper_cfg = HyperTcpConfig { sleep_on_accept_errors: true, keepalive_timeout: Some(Duration::from_secs(1)), no_delay: true };
///
/// let server = HttpServerBuilder::new().build_from_tcp(socket, hyper_cfg).unwrap();
/// }
/// ```
pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>, cfg: HyperTcpConfig) -> Result<Server<M>, Error> {
Copy link
Member Author

@niklasad1 niklasad1 Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsdw is this API reasonable?

Basically the only thing I want to ensure is that no settings gets overwritten without informing the users of this API.

It's a bit awkward to configure some settings twice but no way around that....

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah ok, so the issue is now basically that you can pass in a TcpListener, but you can't configure the Hyper server that is created from it.

One option might be to keep build_from_tcp() as is (which lets you pass in a pre-configured tcp listener but assumes you don't care about how the resulting hyper server is configured), and also have a build_from_hyper() as well (which lets you control everything about the tcp listener and hyper server if you wish). So then you'd have:

  • build -> can't configure socket, can't configure hyper.
  • build_from_tcp -> can configure tcp socket, can't configure hyper.
  • build_from_hyper -> can configure tcp, can configure hyper.

What do you reckon?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough, I think your suggestion is less awkward.

let listener = listener.into();
let local_addr = listener.local_addr().ok();

let listener = hyper::Server::from_tcp(listener)?
.tcp_sleep_on_accept_errors(cfg.sleep_on_accept_errors)
.tcp_keepalive(cfg.keepalive_timeout)
.tcp_nodelay(cfg.no_delay);

Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
})
}

/// Finalizes the configuration of the server.
///
/// ```rust
Expand All @@ -178,57 +218,45 @@ impl<M> Builder<M> {
/// occupied_addr,
/// "127.0.0.1:0".parse().unwrap(),
/// ];
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(occupied_addr).is_err());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).is_ok());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(occupied_addr).await.is_err());
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).await.is_ok());
/// }
/// ```
pub fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
let mut err: Option<Error> = None;

for addr in addrs.to_socket_addrs()? {
let (listener, local_addr) = match self.inner_builder(addr) {
Ok(res) => res,
Err(e) => {
err = Some(e);
continue;
}
};
pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
let listener = TcpListener::bind(addrs).await?.into_std()?;

return Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
});
}
let local_addr = listener.local_addr().ok();
let listener = hyper::Server::from_tcp(listener)?.tcp_nodelay(true);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

let err = err.unwrap_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "No address found").into());
Err(err)
Ok(Server {
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
})
}
}

fn inner_builder(
&self,
addr: SocketAddr,
) -> Result<(hyper::server::Builder<hyper::server::conn::AddrIncoming>, Option<SocketAddr>), Error> {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, None)?;
socket.set_nodelay(true)?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.set_keepalive(self.keep_alive)?;
let address = addr.into();
socket.bind(&address)?;

socket.listen(1024)?;
let listener: TcpListener = socket.into();
let local_addr = listener.local_addr().ok();
let listener = hyper::Server::from_tcp(listener)?;
Ok((listener, local_addr))
}
/// [`hyper`] does some TCP settings on the socket by default, this type provides a way
/// to configure it.
///
/// This type mimics configurations provided by [`hyper::server::AddrIncoming`].
#[derive(Debug, Copy, Clone)]
pub struct HyperTcpConfig {
/// Set whether to sleep on accepts errors.
pub sleep_on_accept_errors: bool,
/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
pub keepalive_timeout: Option<Duration>,
/// Set the value of `TCP_NODELAY` option for accepted connections.
pub no_delay: bool,
}

/// Handle used to run or stop the server.
Expand Down
6 changes: 3 additions & 3 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;

async fn server() -> (SocketAddr, ServerHandle) {
let server = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
let server = HttpServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
let addr = server.local_addr().unwrap();
Expand Down Expand Up @@ -410,7 +410,7 @@ async fn run_forever() {
async fn can_set_the_max_request_body_size() {
let addr = "127.0.0.1:0";
// Rejects all requests larger than 100 bytes
let server = HttpServerBuilder::default().max_request_body_size(100).build(addr).unwrap();
let server = HttpServerBuilder::default().max_request_body_size(100).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok("a".repeat(100))).unwrap();
let addr = server.local_addr().unwrap();
Expand All @@ -434,7 +434,7 @@ async fn can_set_the_max_request_body_size() {
async fn can_set_the_max_response_size() {
let addr = "127.0.0.1:0";
// Set the max response size to 100 bytes
let server = HttpServerBuilder::default().max_response_body_size(100).build(addr).unwrap();
let server = HttpServerBuilder::default().max_response_body_size(100).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok("a".repeat(101))).unwrap();
let addr = server.local_addr().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) {
}

pub async fn http_server_with_access_control(acl: AccessControl) -> (SocketAddr, HttpServerHandle) {
let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0").unwrap();
let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
let addr = server.local_addr().unwrap();
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ async fn http_server(module: RpcModule<()>, counter: Counter) -> Result<(SocketA
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.set_middleware(counter)
.build("127.0.0.1:0")?;
.build("127.0.0.1:0")
.await?;

let addr = server.local_addr()?;
let handle = server.start(module)?;
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ async fn multiple_blocking_calls_overlap() {

#[tokio::test]
async fn subscriptions_do_not_work_for_http_servers() {
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let addr = htserver.local_addr().unwrap();
let htserver_url = format!("http://{}", addr);
let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/resource_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
let server = HttpServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.build("127.0.0.1:0")?;
.build("127.0.0.1:0")
.await?;

let addr = server.local_addr()?;
let handle = server.start(module)?;
Expand Down