Skip to content

Commit

Permalink
Make it possible to disable batch requests support (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc authored May 4, 2022
1 parent 816ecca commit 19aaf65
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 4 deletions.
29 changes: 27 additions & 2 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, Met
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::ErrorCode;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use tokio::net::{TcpListener, ToSocketAddrs};
Expand All @@ -57,6 +57,7 @@ pub struct Builder<M = ()> {
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
Expand All @@ -67,6 +68,7 @@ impl Default for Builder {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
max_response_body_size: TEN_MB_SIZE_BYTES,
batch_requests_supported: true,
resources: Resources::default(),
access_control: AccessControl::default(),
tokio_runtime: None,
Expand Down Expand Up @@ -112,6 +114,7 @@ impl<M> Builder<M> {
Builder {
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
access_control: self.access_control,
tokio_runtime: self.tokio_runtime,
Expand All @@ -137,6 +140,13 @@ impl<M> Builder<M> {
self
}

/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.batch_requests_supported = supported;
self
}

/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
Expand Down Expand Up @@ -199,6 +209,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -241,6 +252,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -274,6 +286,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -323,6 +336,8 @@ pub struct Server<M = ()> {
max_request_body_size: u32,
/// Max response body size.
max_response_body_size: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Access control
access_control: AccessControl,
/// Tracker for currently used resources on the server
Expand All @@ -347,6 +362,7 @@ impl<M: Middleware> Server<M> {
let listener = self.listener;
let resources = self.resources;
let middleware = self.middleware;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;

let make_service = make_service_fn(move |_| {
Expand Down Expand Up @@ -405,6 +421,7 @@ impl<M: Middleware> Server<M> {
resources,
max_request_body_size,
max_response_body_size,
batch_requests_supported,
)
.await?;

Expand Down Expand Up @@ -494,6 +511,7 @@ async fn process_validated_request(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();

Expand Down Expand Up @@ -570,7 +588,14 @@ async fn process_validated_request(
}
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
if !batch_requests_supported {
// Server was configured to not support batches.
is_single = true;
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
} else if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(move |req| {
Expand Down
22 changes: 22 additions & 0 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,25 @@ async fn can_set_the_max_response_size() {

handle.stop().unwrap();
}

#[tokio::test]
async fn disabled_batches() {
let addr = "127.0.0.1:0";
// Disable batches support.
let server = HttpServerBuilder::default().batch_requests_supported(false).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();
let uri = to_http_uri(addr);
let handle = server.start(module).unwrap();

// Send a valid batch.
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.body, batches_not_supported());

handle.stop().unwrap();
}
4 changes: 4 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub fn oversized_request() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32701,"message":"Request is too big"},"id":null}"#.into()
}

pub fn batches_not_supported() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}"#.into()
}

pub fn oversized_response(id: Id, max_limit: u32) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32702,"message":"Response is too big","data":"Exceeded max limit {}"}},"id":{}}}"#,
Expand Down
4 changes: 4 additions & 0 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub const UNKNOWN_ERROR_CODE: i32 = -32001;
pub const SUBSCRIPTION_CLOSED: i32 = -32003;
/// Subscription got closed by the server.
pub const SUBSCRIPTION_CLOSED_WITH_ERROR: i32 = -32004;
/// Batched requests are not supported by the server.
pub const BATCHES_NOT_SUPPORTED_CODE: i32 = -32005;

/// Parse error message
pub const PARSE_ERROR_MSG: &str = "Parse error";
Expand All @@ -199,6 +201,8 @@ pub const METHOD_NOT_FOUND_MSG: &str = "Method not found";
pub const SERVER_IS_BUSY_MSG: &str = "Server is busy, try again later";
/// Reserved for implementation-defined server-errors.
pub const SERVER_ERROR_MSG: &str = "Server error";
/// Batched requests not supported error message.
pub const BATCHES_NOT_SUPPORTED_MSG: &str = "Batched requests are not supported by this server";

/// JSONRPC error code
#[derive(Error, Debug, PartialEq, Copy, Clone)]
Expand Down
22 changes: 20 additions & 2 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::ErrorCode;
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, FutureExt};
Expand Down Expand Up @@ -270,6 +270,7 @@ where
resources.clone(),
cfg.max_request_body_size,
cfg.max_response_body_size,
cfg.batch_requests_supported,
BoundedSubscriptions::new(cfg.max_subscriptions_per_connection),
stop_monitor.clone(),
middleware,
Expand All @@ -292,6 +293,7 @@ async fn background_task(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
bounded_subscriptions: BoundedSubscriptions,
stop_server: StopMonitor,
middleware: impl Middleware,
Expand Down Expand Up @@ -490,7 +492,13 @@ async fn background_task(
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&d) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
if !batch.is_empty() {
if !batch_requests_supported {
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
middleware.on_response(request_start);
} else if !batch.is_empty() {
join_all(batch.into_iter().filter_map(move |req| {
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
Expand Down Expand Up @@ -656,6 +664,8 @@ struct Settings {
allowed_origins: AllowedValue,
/// Policy by which to accept or deny incoming requests based on the `Host` header.
allowed_hosts: AllowedValue,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}
Expand All @@ -667,6 +677,7 @@ impl Default for Settings {
max_response_body_size: TEN_MB_SIZE_BYTES,
max_subscriptions_per_connection: 1024,
max_connections: MAX_CONNECTIONS,
batch_requests_supported: true,
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
Expand Down Expand Up @@ -720,6 +731,13 @@ impl<M> Builder<M> {
self
}

/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.settings.batch_requests_supported = supported;
self
}

/// Set the maximum number of connections allowed. Default is 1024.
pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
self.settings.max_subscriptions_per_connection = max;
Expand Down
29 changes: 29 additions & 0 deletions ws-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,32 @@ async fn custom_subscription_id_works() {
let unsub = client.send_request_text(call("unsubscribe_hello", vec!["0xdeadbeef"], Id::Num(1))).await.unwrap();
assert_eq!(&unsub, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
}

#[tokio::test]
async fn disabled_batches() {
// Disable batches support.
let server = WsServerBuilder::default()
.batch_requests_supported(false)
.build("127.0.0.1:0")
.with_default_timeout()
.await
.unwrap()
.unwrap();

let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();

let handle = server.start(module).unwrap();

// Send a valid batch.
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, batches_not_supported());

handle.stop().unwrap();
}

0 comments on commit 19aaf65

Please sign in to comment.