Skip to content

Commit

Permalink
Optimize serialization for client parameters (#864)
Browse files Browse the repository at this point in the history
* core: Fix doc typo

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Implement generic `ParamBuilder` for RPC parameters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Add specialized RPC parameter builder for arrays and maps

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Implement parameter builder for batch requests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Implement `rpc_params` in the `types` crate

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Adjust `ClientT` for generic efficient parameters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* proc-macro: Render clients using the parameter builders

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust testing to the `ToRpcParams` interface

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Move `rpc_params` to core and simplify testing

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Rename server's trait to `ToRpcServerParams`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* bench: Adjust benches to the `ToRpcParams` interface

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Rename batch builder to `BatchRequestBuilder`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* examples: Re-enable proc-macro example

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Fix doc tests and add panic documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Fix documentation link

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client: Use BatchRequestBuilder as parameter for batch requests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* types: Add specialized constructors for internal `ParamsBuilder`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Implement `EmptyParams` for client's parameters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix macos disabled test

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Improve comment

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* benches: Rename functions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* types: Rename param types to `ArrayParams` and `ObjectParams`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Move paramters to core crate

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Return `core::Error` from `ToRpcParams` trait

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix doc link

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix `ArrayParamsBuilder` doc links

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `ToRpcServerParams` trait

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Fix `ToRpcParams` docs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `ParamsSer` and extend benchmarking

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Optimise `rpc_params` to avoid allocation on error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* params: zero allocation for empty params

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* examples: Add copyright back

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* traits: Remove empty doc line

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/traits.rs

Co-authored-by: James Wilson <james@jsdw.me>

* Update core/src/traits.rs

Co-authored-by: James Wilson <james@jsdw.me>

* examples: Restore `proc_macro` example to origin/master

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Remove empty case for `rpc_params` macro

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson <james@jsdw.me>
  • Loading branch information
3 people authored Sep 6, 2022
1 parent 5a2f6f1 commit 41b8a2c
Show file tree
Hide file tree
Showing 32 changed files with 785 additions and 366 deletions.
80 changes: 62 additions & 18 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use futures_util::future::{join_all, FutureExt};
use futures_util::stream::FuturesUnordered;
use helpers::{http_client, ws_client, SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder, ObjectParams};
use jsonrpsee::core::traits::ToRpcParams;
use jsonrpsee::http_client::HeaderMap;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use jsonrpsee::types::{Id, RequestSer};
use pprof::criterion::{Output, PProfProfiler};
use tokio::runtime::Runtime as TokioRuntime;

Expand Down Expand Up @@ -63,22 +65,46 @@ fn v2_serialize(req: RequestSer<'_>) -> String {
}

pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
crit.bench_function("jsonrpsee_types_v2_array_ref", |b| {
// Construct the serialized array request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_array_params_baseline", |b| {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ArrayParams`.
crit.bench_function("jsonrpsee_types_array_params", |b| {
b.iter(|| {
let mut builder = ArrayParams::new();
builder.insert(1u64).unwrap();
builder.insert(2u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});

crit.bench_function("jsonrpsee_types_v2_vec", |b| {
// Construct the serialized object request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_object_params_baseline", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ObjectParams`.
crit.bench_function("jsonrpsee_types_object_params", |b| {
b.iter(|| {
let mut builder = ObjectParams::new();
builder.insert("key", 1u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
}

trait RequestBencher {
Expand Down Expand Up @@ -129,7 +155,7 @@ fn round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl ClientT>
let bench_name = format!("{}/{}", name, method);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap());
})
});
}
Expand All @@ -139,7 +165,12 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.to_async(rt).iter_with_large_drop(|| async {
black_box(client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap());
black_box(
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
});
group.bench_function("subscribe_response", |b| {
Expand All @@ -149,7 +180,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
})
},
Expand All @@ -166,7 +200,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
b.iter_with_setup(
|| {
rt.block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|sub| {
Expand All @@ -191,7 +228,10 @@ fn batch_round_trip(
let bench_name = format!("{}/{}", name, method);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![(method, None); *batch_size];
let mut batch = BatchRequestBuilder::new();
for _ in 0..*batch_size {
batch.insert(method, ArrayParams::new()).unwrap();
}
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.to_async(rt).iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
Expand Down Expand Up @@ -227,7 +267,7 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
let futs = FuturesUnordered::new();

for _ in 0..10 {
futs.push(client.request::<String>(methods[0], None));
futs.push(client.request::<String, ArrayParams>(methods[0], ArrayParams::new()));
}

join_all(futs).await;
Expand Down Expand Up @@ -267,13 +307,17 @@ fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, n
let futs = FuturesUnordered::new();

for _ in 0..10 {
let fut = client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).then(
|sub| async move {
let fut = client
.subscribe::<String, ArrayParams>(
SUB_METHOD_NAME,
ArrayParams::new(),
UNSUB_METHOD_NAME,
)
.then(|sub| async move {
let mut s = sub.unwrap();

s.next().await.unwrap().unwrap()
},
);
});

futs.push(Box::pin(fut));
}
Expand Down Expand Up @@ -301,7 +345,7 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
client.request::<String>(method, None).await.unwrap();
client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap();
})
});
join_all(tasks).await;
Expand Down Expand Up @@ -333,7 +377,7 @@ fn http_custom_headers_round_trip(

crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method_name, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method_name, ArrayParams::new()).await.unwrap());
})
});
}
Expand Down
23 changes: 17 additions & 6 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use std::sync::Arc;
use std::time::Duration;

use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use crate::types::{ErrorResponse, Id, NotificationSer, RequestSer, Response};
use async_trait::async_trait;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
Expand Down Expand Up @@ -166,9 +168,13 @@ pub struct HttpClient {

#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send,
{
let trace = RpcTracing::notification(method);
async {
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;

let fut = self.transport.send(notif);
Expand All @@ -184,12 +190,15 @@ impl ClientT for HttpClient {
}

/// Perform a request towards the server.
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
Params: ToRpcParams + Send,
{
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let params = params.to_rpc_params()?;

let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);

Expand Down Expand Up @@ -225,10 +234,11 @@ impl ClientT for HttpClient {
.await
}

async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone,
{
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
Expand Down Expand Up @@ -279,13 +289,14 @@ impl ClientT for HttpClient {
#[async_trait]
impl SubscriptionClientT for HttpClient {
/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
async fn subscribe<'a, N>(
async fn subscribe<'a, N, Params>(
&self,
_subscribe_method: &'a str,
_params: Option<ParamsSer<'a>>,
_params: Params,
_unsubscribe_method: &'a str,
) -> Result<Subscription<N>, Error>
where
Params: ToRpcParams + Send,
N: DeserializeOwned,
{
Err(Error::HttpNotImplemented)
Expand Down
31 changes: 17 additions & 14 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::types::error::{ErrorCode, ErrorObject};
use crate::types::ParamsSer;

use crate::HttpClientBuilder;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
Expand All @@ -52,10 +53,8 @@ async fn method_call_with_wrong_id_kind() {
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId)));
}

#[tokio::test]
Expand All @@ -67,7 +66,7 @@ async fn method_call_with_id_str() {
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request("o", rpc_params![]).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

Expand All @@ -77,7 +76,7 @@ async fn notification_works() {
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", None)
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", rpc_params![])
.with_default_timeout()
.await
.unwrap()
Expand Down Expand Up @@ -137,8 +136,10 @@ async fn subscription_response_to_request() {

#[tokio::test]
async fn batch_request_works() {
let batch_request =
vec![("say_hello", rpc_params![]), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
Expand All @@ -147,16 +148,18 @@ async fn batch_request_works() {

#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request =
vec![("say_hello", rpc_params! {}), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}

async fn run_batch_request_with_response<'a>(
batch: Vec<(&'a str, Option<ParamsSer<'a>>)>,
async fn run_batch_request_with_response(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
Expand All @@ -169,7 +172,7 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", None).with_default_timeout().await.unwrap()
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}

fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
Expand Down
Loading

1 comment on commit 41b8a2c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 41b8a2c Previous: 5a2f6f1 Ratio
sync/ws_custom_headers_handshake/1kb 229695 ns/iter (± 29640) 110640 ns/iter (± 9180) 2.08
sync/ws_custom_headers_handshake/2kb 230852 ns/iter (± 34916) 110686 ns/iter (± 2479) 2.09
async/ws_custom_headers_handshake/1kb 234382 ns/iter (± 31105) 110322 ns/iter (± 3137) 2.12
async/ws_custom_headers_handshake/2kb 225692 ns/iter (± 37544) 111106 ns/iter (± 1716) 2.03

This comment was automatically generated by workflow using github-action-benchmark.

CC: @niklasad1

Please sign in to comment.