Skip to content

Commit

Permalink
[http server] Batch requests (#292)
Browse files Browse the repository at this point in the history
* WIP

* Implement draft of batch requests

* fmt

* cleanup

* Explain why we don't use an untagged enum

* Avoid allocating a Vec for single requets

* Add comment

* Add a benchmark for batch requests

* Add more tests, noting where we diverge from the spec
Fix empty batch case, i.e. `[]`

* Obey the fmt

* Update benches/bench.rs

Co-authored-by: Andrew Plaza <aplaza@liquidthink.net>

* Update http-server/src/server.rs

Co-authored-by: Andrew Plaza <aplaza@liquidthink.net>

* Add link to issue

* Explain why we're closing the receiving end of the channel.

* Limit logging of requests and response to 1kb
Add more comments
Factor out batch response collection

* Wrap comment

* tweak log line

* Benchmark batch request over different batch sizes

* fmt

Co-authored-by: Andrew Plaza <aplaza@liquidthink.net>
  • Loading branch information
dvdplm and insipx authored May 4, 2021
1 parent 11ac030 commit 2cae10b
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 24 deletions.
22 changes: 21 additions & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::runtime::Runtime as TokioRuntime;

mod helpers;

criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_main!(benches);

fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String {
Expand Down Expand Up @@ -47,6 +47,13 @@ pub fn http_requests(crit: &mut Criterion) {
run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip");
}

pub fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client.clone(), "http batch requests");
}

pub fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
Expand All @@ -66,6 +73,19 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Clie
});
}

/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
let mut group = crit.benchmark_group(name);
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size];
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.iter(|| rt.block_on(async { client.batch_request::<String>(batch.clone()).await.unwrap() }))
});
}
group.finish();
}

fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
rt: &TokioRuntime,
crit: &mut Criterion,
Expand Down
108 changes: 86 additions & 22 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ use hyper::{
use jsonrpsee_types::error::{Error, GenericTransportError};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error};
use jsonrpsee_utils::{
hyper_helpers::read_response_to_body,
server::{send_error, RpcSender},
};
use serde::Serialize;
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
cmp,
net::{SocketAddr, TcpListener},
sync::Arc,
};
Expand Down Expand Up @@ -153,6 +158,30 @@ impl Server {
Ok::<_, HyperError>(service_fn(move |request| {
let methods = methods.clone();
let access_control = access_control.clone();

// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
// the params from the request. The result of the computation is sent back over the `tx` channel and
// the result(s) are collected into a `String` and sent back over the wire.
let execute =
move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| {
if let Some(method) = methods.get(method_name) {
let params = RpcParams::new(params.map(|params| params.get()));
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(id, params, &tx, 0) {
log::error!(
"execution of method call '{}' failed: {:?}, request id={:?}",
method_name,
err,
id
);
}
} else {
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
}
};

// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
async move {
if let Err(e) = access_control_is_valid(&access_control, &request) {
return Ok::<_, HyperError>(e);
Expand All @@ -175,31 +204,48 @@ impl Server {

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded();
// Is this a single request or a batch (or error)?
let mut single = true;

match serde_json::from_slice::<JsonRpcRequest>(&body) {
Ok(req) => {
log::debug!("recv: {:?}", req);
let params = RpcParams::new(req.params.map(|params| params.get()));
if let Some(method) = methods.get(&*req.method) {
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(req.id, params, &tx, 0) {
log::error!("method_call: {} failed: {:?}", req.method, err);
}
} else {
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be
// used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged
// enum here and have to try each case individually: first the single request case, then the
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
// to [`serde_json::from_slice`] which is pretty annoying.
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) =
serde_json::from_slice::<JsonRpcRequest>(&body)
{
execute(id, &tx, &method_name, params);
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
single = false;
for JsonRpcRequest { id, method: method_name, params, .. } in batch {
execute(id, &tx, &method_name, params);
}
} else {
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
Err(_e) => {
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};
send_error(id, &tx, code.into());
}
} else {
log::error!(
"[service_fn], Cannot parse request body={:?}",
String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)])
);
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};
send_error(id, &tx, code.into());
}
// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_responses(rx).await
};

let response = rx.next().await.expect("Sender is still alive managed by us above; qed");
log::debug!("send: {:?}", response);
log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
Ok::<_, HyperError>(response::ok_response(response))
}
}))
Expand All @@ -211,6 +257,24 @@ impl Server {
}
}

// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in
// `[`/`]`.
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
let mut buf = String::with_capacity(2048);
buf.push('[');
let mut buf = rx
.fold(buf, |mut acc, response| async {
acc = [acc, response].concat();
acc.push(',');
acc
})
.await;
// Remove trailing comma
buf.pop();
buf.push(']');
buf
}

// Checks to that access control of the received request is the same as configured.
fn access_control_is_valid(
access_control: &AccessControl,
Expand Down
87 changes: 87 additions & 0 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ async fn single_method_call_works() {
}
}

#[tokio::test]
async fn invalid_single_method_call() {
let _ = env_logger::try_init();
let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_request(Id::Null));
}

#[tokio::test]
async fn single_method_call_with_params() {
let addr = server().await;
Expand All @@ -50,6 +62,81 @@ async fn single_method_call_with_params() {
assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1)));
}

#[tokio::test]
async fn valid_batched_method_calls() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"[
{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1},
{"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2},
{"jsonrpc":"2.0","method":"say_hello","id":3},
{"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4}
]"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(
response.body,
r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"#
);
}

#[tokio::test]
async fn batched_notifications() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"[
{"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]},
{"jsonrpc": "2.0", "method": "notif", "params": [7]}
]"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: this is *not* according to spec. Response should be the empty string, `""`.
assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#);
}

#[tokio::test]
async fn invalid_batched_method_calls() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

// batch with no requests
let req = r#"[]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_request(Id::Null));

// batch with invalid request
let req = r#"[123]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec the `id` should be `null` here, not 123.
assert_eq!(response.body, invalid_request(Id::Num(123)));

// batch with invalid request
let req = r#"[1, 2, 3]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec this should return an array of three `Invalid Request`s
assert_eq!(response.body, parse_error(Id::Null));

// invalid JSON in batch
let req = r#"[
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method"
]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, parse_error(Id::Null));
}

#[tokio::test]
async fn should_return_method_not_found() {
let addr = server().await;
Expand Down
2 changes: 1 addition & 1 deletion types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<'a> RpcParams<'a> {
/// If your type implement `Into<JsonValue>` call that favor of `serde_json::to:value` to
/// construct the parameters. Because `serde_json::to_value` serializes the type which
/// allocates whereas `Into<JsonValue>` doesn't in most cases.
#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
#[serde(untagged)]
pub enum JsonRpcParams<'a> {
/// No params.
Expand Down

0 comments on commit 2cae10b

Please sign in to comment.