Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize serialization for client parameters #864

Merged
merged 42 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f28b449
core: Fix doc typo
lexnv Aug 24, 2022
febd278
types: Implement generic `ParamBuilder` for RPC parameters
lexnv Aug 24, 2022
d50de5e
types: Add specialized RPC parameter builder for arrays and maps
lexnv Aug 24, 2022
7af14ff
types: Implement parameter builder for batch requests
lexnv Aug 26, 2022
98c0058
types: Implement `rpc_params` in the `types` crate
lexnv Aug 26, 2022
bf7cd86
core: Adjust `ClientT` for generic efficient parameters
lexnv Aug 26, 2022
32c5f51
proc-macro: Render clients using the parameter builders
lexnv Aug 26, 2022
1bc535f
Adjust testing to the `ToRpcParams` interface
lexnv Aug 26, 2022
315ee89
core: Move `rpc_params` to core and simplify testing
lexnv Aug 26, 2022
555f53b
core: Rename server's trait to `ToRpcServerParams`
lexnv Aug 26, 2022
92f9e9e
bench: Adjust benches to the `ToRpcParams` interface
lexnv Aug 29, 2022
cf941b7
Fix clippy
lexnv Aug 29, 2022
c48cc4a
types: Rename batch builder to `BatchRequestBuilder`
lexnv Aug 29, 2022
89b03c5
examples: Re-enable proc-macro example
lexnv Aug 29, 2022
07a888f
types: Fix doc tests and add panic documentation
lexnv Aug 29, 2022
faf016c
core: Fix documentation link
lexnv Aug 29, 2022
7485bbc
client: Use BatchRequestBuilder as parameter for batch requests
lexnv Aug 30, 2022
0e3e58e
Update core/src/server/rpc_module.rs
lexnv Aug 31, 2022
ee213d5
Update core/src/server/rpc_module.rs
lexnv Aug 31, 2022
b460c94
types: Add specialized constructors for internal `ParamsBuilder`
lexnv Aug 31, 2022
706a672
types: Implement `EmptyParams` for client's parameters
lexnv Aug 31, 2022
0d80222
tests: Fix macos disabled test
lexnv Aug 31, 2022
20e357b
types: Improve comment
lexnv Aug 31, 2022
9590d1e
Fix clippy
lexnv Aug 31, 2022
4789f9e
benches: Rename functions
lexnv Aug 31, 2022
6f097b7
types: Rename param types to `ArrayParams` and `ObjectParams`
lexnv Aug 31, 2022
7df0f55
Move paramters to core crate
lexnv Aug 31, 2022
22f31dc
core: Return `core::Error` from `ToRpcParams` trait
lexnv Aug 31, 2022
0129bec
Fix doc link
lexnv Aug 31, 2022
0d2ccd8
Fix `ArrayParamsBuilder` doc links
lexnv Sep 1, 2022
126c27a
Remove `ToRpcServerParams` trait
lexnv Sep 1, 2022
a9a582d
core: Fix `ToRpcParams` docs
lexnv Sep 2, 2022
83d94c6
Remove `ParamsSer` and extend benchmarking
lexnv Sep 2, 2022
bbdcc38
core: Optimise `rpc_params` to avoid allocation on error
lexnv Sep 2, 2022
e09181a
params: zero allocation for empty params
lexnv Sep 2, 2022
5c85471
examples: Add copyright back
lexnv Sep 2, 2022
202d5bc
traits: Remove empty doc line
lexnv Sep 2, 2022
5a4a463
Update core/src/traits.rs
lexnv Sep 2, 2022
c1f1dcb
Update core/src/traits.rs
lexnv Sep 2, 2022
2fabe37
examples: Restore `proc_macro` example to origin/master
lexnv Sep 5, 2022
75d6b8b
core: Remove empty case for `rpc_params` macro
lexnv Sep 5, 2022
c85670e
Merge remote-tracking branch 'origin/862_param_ser_v4' into 862_param…
lexnv Sep 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 62 additions & 18 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use futures_util::future::{join_all, FutureExt};
use futures_util::stream::FuturesUnordered;
use helpers::{http_client, ws_client, SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder, ObjectParams};
use jsonrpsee::core::traits::ToRpcParams;
use jsonrpsee::http_client::HeaderMap;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use jsonrpsee::types::{Id, RequestSer};
use pprof::criterion::{Output, PProfProfiler};
use tokio::runtime::Runtime as TokioRuntime;

Expand Down Expand Up @@ -63,22 +65,46 @@ fn v2_serialize(req: RequestSer<'_>) -> String {
}

pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
crit.bench_function("jsonrpsee_types_v2_array_ref", |b| {
// Construct the serialized array request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_array_params_baseline", |b| {
b.iter(|| {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ArrayParams`.
crit.bench_function("jsonrpsee_types_array_params", |b| {
b.iter(|| {
let mut builder = ArrayParams::new();
builder.insert(1u64).unwrap();
builder.insert(2u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});

crit.bench_function("jsonrpsee_types_v2_vec", |b| {
// Construct the serialized object request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_object_params_baseline", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ObjectParams`.
crit.bench_function("jsonrpsee_types_object_params", |b| {
b.iter(|| {
let mut builder = ObjectParams::new();
builder.insert("key", 1u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
}

trait RequestBencher {
Expand Down Expand Up @@ -129,7 +155,7 @@ fn round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl ClientT>
let bench_name = format!("{}/{}", name, method);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap());
})
});
}
Expand All @@ -139,7 +165,12 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.to_async(rt).iter_with_large_drop(|| async {
black_box(client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap());
black_box(
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
});
group.bench_function("subscribe_response", |b| {
Expand All @@ -149,7 +180,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
})
},
Expand All @@ -166,7 +200,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
b.iter_with_setup(
|| {
rt.block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|sub| {
Expand All @@ -191,7 +228,10 @@ fn batch_round_trip(
let bench_name = format!("{}/{}", name, method);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![(method, None); *batch_size];
let mut batch = BatchRequestBuilder::new();
for _ in 0..*batch_size {
batch.insert(method, ArrayParams::new()).unwrap();
}
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.to_async(rt).iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
Expand Down Expand Up @@ -227,7 +267,7 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
let futs = FuturesUnordered::new();

for _ in 0..10 {
futs.push(client.request::<String>(methods[0], None));
futs.push(client.request::<String, ArrayParams>(methods[0], ArrayParams::new()));
}

join_all(futs).await;
Expand Down Expand Up @@ -267,13 +307,17 @@ fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, n
let futs = FuturesUnordered::new();

for _ in 0..10 {
let fut = client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).then(
|sub| async move {
let fut = client
.subscribe::<String, ArrayParams>(
SUB_METHOD_NAME,
ArrayParams::new(),
UNSUB_METHOD_NAME,
)
.then(|sub| async move {
let mut s = sub.unwrap();

s.next().await.unwrap().unwrap()
},
);
});

futs.push(Box::pin(fut));
}
Expand Down Expand Up @@ -301,7 +345,7 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
client.request::<String>(method, None).await.unwrap();
client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap();
})
});
join_all(tasks).await;
Expand Down Expand Up @@ -333,7 +377,7 @@ fn http_custom_headers_round_trip(

crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method_name, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method_name, ArrayParams::new()).await.unwrap());
})
});
}
Expand Down
23 changes: 17 additions & 6 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use std::sync::Arc;
use std::time::Duration;

use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use crate::types::{ErrorResponse, Id, NotificationSer, RequestSer, Response};
use async_trait::async_trait;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
Expand Down Expand Up @@ -166,9 +168,13 @@ pub struct HttpClient {

#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send,
{
let trace = RpcTracing::notification(method);
async {
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;

let fut = self.transport.send(notif);
Expand All @@ -184,12 +190,15 @@ impl ClientT for HttpClient {
}

/// Perform a request towards the server.
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
Params: ToRpcParams + Send,
{
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let params = params.to_rpc_params()?;

let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);

Expand Down Expand Up @@ -225,10 +234,11 @@ impl ClientT for HttpClient {
.await
}

async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
Copy link
Member

@niklasad1 niklasad1 Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this API is a bit different than the others i.e, takes the concrete type BatchRequestBuilder instead of ToRpcParams I think that makes sense as it supports both positional params (array) and named params (map) but not possible for folks to use there own impl of this.

Indeed it makes this API easier to understand and cleaner, cool

where
R: DeserializeOwned + Default + Clone,
{
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
Expand Down Expand Up @@ -279,13 +289,14 @@ impl ClientT for HttpClient {
#[async_trait]
impl SubscriptionClientT for HttpClient {
/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
async fn subscribe<'a, N>(
async fn subscribe<'a, N, Params>(
&self,
_subscribe_method: &'a str,
_params: Option<ParamsSer<'a>>,
_params: Params,
_unsubscribe_method: &'a str,
) -> Result<Subscription<N>, Error>
where
Params: ToRpcParams + Send,
N: DeserializeOwned,
{
Err(Error::HttpNotImplemented)
Expand Down
31 changes: 17 additions & 14 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::types::error::{ErrorCode, ErrorObject};
use crate::types::ParamsSer;

use crate::HttpClientBuilder;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
Expand All @@ -52,10 +53,8 @@ async fn method_call_with_wrong_id_kind() {
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId)));
}

#[tokio::test]
Expand All @@ -67,7 +66,7 @@ async fn method_call_with_id_str() {
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request("o", rpc_params![]).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

Expand All @@ -77,7 +76,7 @@ async fn notification_works() {
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", None)
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", rpc_params![])
.with_default_timeout()
.await
.unwrap()
Expand Down Expand Up @@ -137,8 +136,10 @@ async fn subscription_response_to_request() {

#[tokio::test]
async fn batch_request_works() {
let batch_request =
vec![("say_hello", rpc_params![]), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
Expand All @@ -147,16 +148,18 @@ async fn batch_request_works() {

#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request =
vec![("say_hello", rpc_params! {}), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}

async fn run_batch_request_with_response<'a>(
batch: Vec<(&'a str, Option<ParamsSer<'a>>)>,
async fn run_batch_request_with_response(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
Expand All @@ -169,7 +172,7 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", None).with_default_timeout().await.unwrap()
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}

fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
Expand Down
Loading