Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async/subscription benches #372

Merged
merged 2 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ publish = false

[dev-dependencies]
criterion = "0.3"
futures-channel = "0.3.14"
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
num_cpus = "1"
serde_json = "1"
Expand Down
194 changes: 158 additions & 36 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,58 @@
use criterion::*;
use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::{
http_client::{
traits::Client,
v2::params::{Id, JsonRpcParams},
v2::request::JsonRpcCallSer,
HttpClientBuilder,
},
types::traits::SubscriptionClient,
ws_client::WsClientBuilder,
};
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

mod helpers;

criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_main!(benches);
criterion_group!(types_benches, jsonrpsee_types_v2);
criterion_group!(
sync_benches,
SyncBencher::http_requests,
SyncBencher::batched_http_requests,
SyncBencher::websocket_requests
);
criterion_group!(
async_benches,
AsyncBencher::http_requests,
AsyncBencher::batched_http_requests,
AsyncBencher::websocket_requests
);
criterion_group!(subscriptions, AsyncBencher::subscriptions);
criterion_main!(types_benches, sync_benches, async_benches, subscriptions);

#[derive(Debug, Clone, Copy)]
enum RequestType {
Sync,
Async,
}

impl RequestType {
fn method_name(self) -> &'static str {
match self {
RequestType::Sync => crate::helpers::SYNC_METHOD_NAME,
RequestType::Async => crate::helpers::ASYNC_METHOD_NAME,
}
}

fn group_name(self, name: &str) -> String {
let request_type_name = match self {
RequestType::Sync => "sync",
RequestType::Async => "async",
};
format!("{}/{}", request_type_name, name)
}
}

fn v2_serialize(req: JsonRpcCallSer<'_>) -> String {
serde_json::to_string(&req).unwrap()
Expand All @@ -39,53 +77,135 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
});
}

pub fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip");
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip");
}
trait RequestBencher {
const REQUEST_TYPE: RequestType;

pub fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests");
fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE);
}

fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE);
}

fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip", Self::REQUEST_TYPE);
}

fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests", Self::REQUEST_TYPE);
}

fn subscriptions(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_sub_round_trip(&rt, crit, client, "subscriptions");
}
}

pub fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip");
run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip");
pub struct SyncBencher;

impl RequestBencher for SyncBencher {
const REQUEST_TYPE: RequestType = RequestType::Sync;
}
pub struct AsyncBencher;

pub fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests");
impl RequestBencher for AsyncBencher {
const REQUEST_TYPE: RequestType = RequestType::Async;
}

fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
crit.bench_function(name, |b| {
fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str, request: RequestType) {
crit.bench_function(&request.group_name(name), |b| {
b.iter(|| {
rt.block_on(async {
black_box(client.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
black_box(client.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap());
})
})
});
}

/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClient>, name: &str) {
Copy link
Member

@niklasad1 niklasad1 Jun 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a follow could be to subscribe to the "same subscription (by method name)" many times to benchmark how costly it is to access the Mutex when the number of subscribers grows. This Mutex should only accessed when creating a new subscription or dropping an existing one however so maybe not that interesting anymore.

currently we have one Arc<Mutex> per registered subscription (by method name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be more right to write more specific benches for that matter. Like, ones that do not include real server and client, and thus impact of mutex will be more clear.

let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.iter_with_large_drop(|| {
rt.block_on(async {
black_box(
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
})
});
group.bench_function("subscribe_response", |b| {
b.iter_with_setup(
|| {
rt.block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|mut sub| {
rt.block_on(async { black_box(sub.next().await.unwrap()) });
Copy link
Member

@niklasad1 niklasad1 Jun 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, and you can't create the subscription outside the bench function because fn next needs &mut self, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, but in that case, it'd be quite difficult to create a representative benchmark.
bench function is called many times, thus if we will try to re-use the subscription made outside of the bench, then the subscription function will have to yield multiple values. If it'd be done without timeouts, it may eat RAM pretty quickly (sender is unbounded). If we will use timeouts, it may affect benchmark results.
In theory, we can try to synchronize somehow received/sent messages using some kind of semaphore, but once again this communication and overhead caused by it will affect the benchmark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough sounds complicated make sense.

// Note that this benchmark will include costs for measuring `drop` for subscription,
// since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`.
// To estimate pure cost of method, one should subtract the result of `unsub` bench
// from this one.
},
)
});
group.bench_function("unsub", |b| {
b.iter_with_setup(
|| {
rt.block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|sub| {
// Subscription will be closed inside of the drop impl.
// Actually, it just sends a notification about object being closed,
// but it's still important to know that drop impl is not too expensive.
drop(black_box(sub));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this

},
)
});
}

/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn run_round_trip_with_batch(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl Client>,
name: &str,
request: RequestType,
) {
let mut group = crit.benchmark_group(request.group_name(name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size];
let batch = vec![(request.method_name(), JsonRpcParams::NoParams); *batch_size];
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.iter(|| rt.block_on(async { client.batch_request::<String>(batch.clone()).await.unwrap() }))
Expand All @@ -99,17 +219,19 @@ fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
crit: &mut Criterion,
client: Arc<C>,
name: &str,
request: RequestType,
) {
let mut group = crit.benchmark_group(name);
let mut group = crit.benchmark_group(request.group_name(name));
for num_concurrent_tasks in helpers::concurrent_tasks() {
group.bench_function(format!("{}", num_concurrent_tasks), |b| {
b.iter(|| {
let mut tasks = Vec::new();
for _ in 0..num_concurrent_tasks {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _ =
black_box(client_rc.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
let _ = black_box(
client_rc.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap(),
);
});
tasks.push(task);
}
Expand Down
20 changes: 18 additions & 2 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use futures_channel::oneshot;
use futures_util::future::FutureExt;
use jsonrpsee::{
http_server::HttpServerBuilder,
ws_server::{RpcModule, WsServerBuilder},
};

pub(crate) const SYNC_METHOD_NAME: &str = "say_hello";
pub(crate) const ASYNC_METHOD_NAME: &str = "say_hello_async";
pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";

/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
Expand All @@ -25,7 +32,16 @@ pub async fn ws_server() -> String {
tokio::spawn(async move {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
Expand Down