Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benches: add option to run benchmarks against jsonrpc crate servers #527

Merged
merged 16 commits into from
Oct 17, 2021
Merged
7 changes: 7 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
jsonrpc-ws-server = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
num_cpus = "1"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
Expand All @@ -20,3 +23,7 @@ tokio = { version = "1", features = ["full"] }
name = "bench"
path = "bench.rs"
harness = false

[features]
# Run benchmarks against servers in https://github.com/paritytech/jsonrpc/
jsonrpc-crate = []
18 changes: 11 additions & 7 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ criterion_group!(
sync_benches,
SyncBencher::http_requests,
SyncBencher::batched_http_requests,
SyncBencher::websocket_requests
SyncBencher::websocket_requests,
// TODO: https://github.com/paritytech/jsonrpsee/issues/528
// SyncBencher::batched_ws_requests,
);
criterion_group!(
async_benches,
AsyncBencher::http_requests,
AsyncBencher::batched_http_requests,
AsyncBencher::websocket_requests
AsyncBencher::websocket_requests,
// TODO: https://github.com/paritytech/jsonrpsee/issues/528
// AsyncBencher::batched_ws_requests
);
criterion_group!(subscriptions, AsyncBencher::subscriptions);
criterion_main!(types_benches, sync_benches, async_benches, subscriptions);
Expand Down Expand Up @@ -82,7 +86,7 @@ trait RequestBencher {

fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _handle) = rt.block_on(helpers::http_server());
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE);
Expand All @@ -91,14 +95,14 @@ trait RequestBencher {

fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _handle) = rt.block_on(helpers::http_server());
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE);
}

fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
Expand All @@ -108,15 +112,15 @@ trait RequestBencher {

fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests", Self::REQUEST_TYPE);
}

fn subscriptions(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_sub_round_trip(&rt, crit, client, "subscriptions");
Expand Down
125 changes: 97 additions & 28 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,83 @@
use futures_channel::oneshot;
use jsonrpsee::{
http_server::{HttpServerBuilder, HttpStopHandle},
ws_server::{RpcModule, WsServerBuilder},
};

pub(crate) const SYNC_METHOD_NAME: &str = "say_hello";
pub(crate) const ASYNC_METHOD_NAME: &str = "say_hello_async";
pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";

/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_http_server::Server) {
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::*;

let mut io = IoHandler::new();
io.add_sync_method(SYNC_METHOD_NAME, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_METHOD_NAME, |_| async { Ok(Value::String("lo".to_string())) });

let server = ServerBuilder::new(io)
.max_request_body_size(usize::MAX)
.event_loop_executor(handle)
.start_http(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");

let addr = *server.address();
(format!("http://{}", addr), server)
}

/// Run jsonrpc WebSocket server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_server::Server) {
use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::jsonrpc_core::*;
use jsonrpc_ws_server::*;
use std::sync::atomic::{AtomicU64, Ordering};

const ID: AtomicU64 = AtomicU64::new(0);

let handle2 = handle.clone();

let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_sync_method(SYNC_METHOD_NAME, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_METHOD_NAME, |_| async { Ok(Value::String("lo".to_string())) });
io.add_subscription(
SUB_METHOD_NAME,
(SUB_METHOD_NAME, move |_params: Params, _, subscriber: Subscriber| {
handle2.spawn(async move {
let id = ID.fetch_add(1, Ordering::Relaxed);
let sink = subscriber.assign_id(SubscriptionId::Number(id)).unwrap();
// NOTE(niklasad1): the way jsonrpc works this is the only way to get this working
// -> jsonrpc responds to the request in background so not possible to know when the request
// has been answered, so this benchmark is bad.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut map = serde_json::Map::new();
map.insert("subscription".into(), id.into());
map.insert("result".into(), "hello".into());
let _ = sink.notify(Params::Map(map));
});
}),
(UNSUB_METHOD_NAME, |_id: SubscriptionId, _| futures::future::ok(Value::Bool(true))),
);

let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
std::sync::Arc::new(Session::new(context.sender().clone()))
})
.event_loop_executor(handle)
.start(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");

let addr = *server.addr();
(format!("ws://{}", addr), server)
}

/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> (String, HttpStopHandle) {
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::http_server::HttpStopHandle) {
use jsonrpsee::http_server::{HttpServerBuilder, RpcModule};

let server = HttpServerBuilder::default()
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0".parse().unwrap())
.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
Expand All @@ -22,25 +87,29 @@ pub async fn http_server() -> (String, HttpStopHandle) {
}

/// Run jsonrpsee WebSocket server for benchmarks.
pub async fn ws_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start(module).unwrap()
});
format!("ws://{}", server_started_rx.await.unwrap())
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws_server::WsStopHandle) {
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};

let server = WsServerBuilder::default()
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0")
.await
.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();
let addr = format!("ws://{}", server.local_addr().unwrap());
let handle = server.start(module).unwrap();
(addr, handle)
}

/// Get number of concurrent tasks based on the num_cpus.
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

//! Example of using proc macro to generate working client and server.

use std::iter;
use std::net::SocketAddr;

use jsonrpsee::{ws_client::*, ws_server::WsServerBuilder};
Expand Down Expand Up @@ -293,7 +292,7 @@ async fn multiple_blocking_calls_overlap() {

let params = RawValue::from_string("[]".into()).ok();

let futures = iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(4);
let futures = std::iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(4);
let now = Instant::now();
let results = futures::future::join_all(futures).await;
let elapsed = now.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
drop(claimed);
})
.map(|err| {
log::error!("Join error for blocking RPC method: {:?}", err);
tracing::error!("Join error for blocking RPC method: {:?}", err);
})
.boxed()
})),
Expand Down