Skip to content

Commit

Permalink
benches: add option to run benchmarks against jsonrpc crate servers (#…
Browse files Browse the repository at this point in the history
…527)

* fix http client bench with request limit

* benches for jsonrpc servers

* workaround; dont use max request limit

* add subscriptions

* revert unintentional change

* ignore batch request bench for ws

* fmt

* log -> tracing

* test bench CI

* test bench v0.3

* wtf; run CI

* work plz

* remove test CI bench

* fix compile warn on macos
  • Loading branch information
niklasad1 authored Oct 17, 2021
1 parent 37474f4 commit 0b43555
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 38 deletions.
7 changes: 7 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
jsonrpc-ws-server = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
num_cpus = "1"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
Expand All @@ -20,3 +23,7 @@ tokio = { version = "1", features = ["full"] }
name = "bench"
path = "bench.rs"
harness = false

[features]
# Run benchmarks against servers in https://github.com/paritytech/jsonrpc/
jsonrpc-crate = []
18 changes: 11 additions & 7 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ criterion_group!(
sync_benches,
SyncBencher::http_requests,
SyncBencher::batched_http_requests,
SyncBencher::websocket_requests
SyncBencher::websocket_requests,
// TODO: https://github.com/paritytech/jsonrpsee/issues/528
// SyncBencher::batched_ws_requests,
);
criterion_group!(
async_benches,
AsyncBencher::http_requests,
AsyncBencher::batched_http_requests,
AsyncBencher::websocket_requests
AsyncBencher::websocket_requests,
// TODO: https://github.com/paritytech/jsonrpsee/issues/528
// AsyncBencher::batched_ws_requests
);
criterion_group!(subscriptions, AsyncBencher::subscriptions);
criterion_main!(types_benches, sync_benches, async_benches, subscriptions);
Expand Down Expand Up @@ -82,7 +86,7 @@ trait RequestBencher {

fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _handle) = rt.block_on(helpers::http_server());
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE);
Expand All @@ -91,14 +95,14 @@ trait RequestBencher {

fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _handle) = rt.block_on(helpers::http_server());
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(HttpClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE);
}

fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
Expand All @@ -108,15 +112,15 @@ trait RequestBencher {

fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests", Self::REQUEST_TYPE);
}

fn subscriptions(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_sub_round_trip(&rt, crit, client, "subscriptions");
Expand Down
125 changes: 97 additions & 28 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,83 @@
use futures_channel::oneshot;
use jsonrpsee::{
http_server::{HttpServerBuilder, HttpStopHandle},
ws_server::{RpcModule, WsServerBuilder},
};

pub(crate) const SYNC_METHOD_NAME: &str = "say_hello";
pub(crate) const ASYNC_METHOD_NAME: &str = "say_hello_async";
pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";

/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_http_server::Server) {
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::*;

let mut io = IoHandler::new();
io.add_sync_method(SYNC_METHOD_NAME, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_METHOD_NAME, |_| async { Ok(Value::String("lo".to_string())) });

let server = ServerBuilder::new(io)
.max_request_body_size(usize::MAX)
.event_loop_executor(handle)
.start_http(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");

let addr = *server.address();
(format!("http://{}", addr), server)
}

/// Run jsonrpc WebSocket server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_server::Server) {
use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::jsonrpc_core::*;
use jsonrpc_ws_server::*;
use std::sync::atomic::{AtomicU64, Ordering};

const ID: AtomicU64 = AtomicU64::new(0);

let handle2 = handle.clone();

let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_sync_method(SYNC_METHOD_NAME, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_METHOD_NAME, |_| async { Ok(Value::String("lo".to_string())) });
io.add_subscription(
SUB_METHOD_NAME,
(SUB_METHOD_NAME, move |_params: Params, _, subscriber: Subscriber| {
handle2.spawn(async move {
let id = ID.fetch_add(1, Ordering::Relaxed);
let sink = subscriber.assign_id(SubscriptionId::Number(id)).unwrap();
// NOTE(niklasad1): the way jsonrpc works this is the only way to get this working
// -> jsonrpc responds to the request in background so not possible to know when the request
// has been answered, so this benchmark is bad.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut map = serde_json::Map::new();
map.insert("subscription".into(), id.into());
map.insert("result".into(), "hello".into());
let _ = sink.notify(Params::Map(map));
});
}),
(UNSUB_METHOD_NAME, |_id: SubscriptionId, _| futures::future::ok(Value::Bool(true))),
);

let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
std::sync::Arc::new(Session::new(context.sender().clone()))
})
.event_loop_executor(handle)
.start(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");

let addr = *server.addr();
(format!("ws://{}", addr), server)
}

/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> (String, HttpStopHandle) {
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::http_server::HttpStopHandle) {
use jsonrpsee::http_server::{HttpServerBuilder, RpcModule};

let server = HttpServerBuilder::default()
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0".parse().unwrap())
.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
Expand All @@ -22,25 +87,29 @@ pub async fn http_server() -> (String, HttpStopHandle) {
}

/// Run jsonrpsee WebSocket server for benchmarks.
pub async fn ws_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start(module).unwrap()
});
format!("ws://{}", server_started_rx.await.unwrap())
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws_server::WsStopHandle) {
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};

let server = WsServerBuilder::default()
.max_request_body_size(u32::MAX)
.custom_tokio_runtime(handle)
.build("127.0.0.1:0")
.await
.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();
let addr = format!("ws://{}", server.local_addr().unwrap());
let handle = server.start(module).unwrap();
(addr, handle)
}

/// Get number of concurrent tasks based on the num_cpus.
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

//! Example of using proc macro to generate working client and server.
use std::iter;
use std::net::SocketAddr;

use jsonrpsee::{ws_client::*, ws_server::WsServerBuilder};
Expand Down Expand Up @@ -293,7 +292,7 @@ async fn multiple_blocking_calls_overlap() {

let params = RawValue::from_string("[]".into()).ok();

let futures = iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(4);
let futures = std::iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(4);
let now = Instant::now();
let results = futures::future::join_all(futures).await;
let elapsed = now.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
drop(claimed);
})
.map(|err| {
log::error!("Join error for blocking RPC method: {:?}", err);
tracing::error!("Join error for blocking RPC method: {:?}", err);
})
.boxed()
})),
Expand Down

0 comments on commit 0b43555

Please sign in to comment.