diff --git a/server/src/server.rs b/server/src/server.rs index db9714887f..80f78a6ad8 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -123,7 +123,7 @@ where let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection; let allow_hosts = self.cfg.allow_hosts; let logger = self.logger; - let batch_requests_supported = self.cfg.batch_requests_supported; + let batch_requests_config = self.cfg.batch_requests_config; let id_provider = self.id_provider; let mut id: u32 = 0; @@ -145,7 +145,7 @@ where max_response_body_size, max_log_length, max_subscriptions_per_connection, - batch_requests_supported, + batch_requests_config, id_provider: id_provider.clone(), ping_interval: self.cfg.ping_interval, stop_handle: stop_handle.clone(), @@ -194,7 +194,7 @@ struct Settings { /// Host filtering. allow_hosts: AllowHosts, /// Whether batch requests are supported by this server or not. - batch_requests_supported: bool, + batch_requests_config: BatchRequestConfig, /// Custom tokio runtime to run the server on. tokio_runtime: Option, /// The interval at which `Ping` frames are submitted. @@ -207,6 +207,16 @@ struct Settings { message_buffer_capacity: u32, } +#[derive(Debug, Copy, Clone)] +pub enum BatchRequestConfig { + /// Batch requests are disabled. + Disabled, + /// Each batch request is limited to `len` and any batch request bigger than `len` will not be processed. + Limit(u32), + /// The batch request is unlimited. + Unlimited, +} + impl Default for Settings { fn default() -> Self { Self { @@ -215,7 +225,7 @@ impl Default for Settings { max_log_length: 4096, max_connections: MAX_CONNECTIONS, max_subscriptions_per_connection: 1024, - batch_requests_supported: true, + batch_requests_config: BatchRequestConfig::Unlimited, allow_hosts: AllowHosts::Any, tokio_runtime: None, ping_interval: Duration::from_secs(60), @@ -272,10 +282,12 @@ impl Builder { self } - /// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch). - /// By default, support is enabled. - pub fn batch_requests_supported(mut self, supported: bool) -> Self { - self.settings.batch_requests_supported = supported; + /// Configure how [batch requests](https://www.jsonrpc.org/specification#batch) shall be handled + /// by the server. + /// + /// Default: batch requests are allowed and can be arbitrary big but the maximum payload size is limited. + pub fn set_batch_request_config(mut self, cfg: BatchRequestConfig) -> Self { + self.settings.batch_requests_config = cfg; self } @@ -556,7 +568,7 @@ pub(crate) struct ServiceData { /// Maximum number of subscriptions per connection. pub(crate) max_subscriptions_per_connection: u32, /// Whether batch requests are supported by this server or not. - pub(crate) batch_requests_supported: bool, + pub(crate) batch_requests_config: BatchRequestConfig, /// Subscription ID provider. pub(crate) id_provider: Arc, /// Ping interval @@ -663,7 +675,7 @@ impl hyper::service::Service> for TowerSe max_request_body_size: self.inner.max_request_body_size, max_response_body_size: self.inner.max_response_body_size, max_log_length: self.inner.max_log_length, - batch_requests_supported: self.inner.batch_requests_supported, + batch_requests_config: self.inner.batch_requests_config, logger: self.inner.logger.clone(), conn: self.inner.conn.clone(), remote_addr: self.inner.remote_addr, @@ -696,7 +708,7 @@ struct ProcessConnection { /// Maximum number of subscriptions per connection. max_subscriptions_per_connection: u32, /// Whether batch requests are supported by this server or not. - batch_requests_supported: bool, + batch_requests_config: BatchRequestConfig, /// Subscription ID provider. id_provider: Arc, /// Ping interval @@ -764,7 +776,7 @@ fn process_connection<'a, L: Logger, B, U>( max_response_body_size: cfg.max_response_body_size, max_log_length: cfg.max_log_length, max_subscriptions_per_connection: cfg.max_subscriptions_per_connection, - batch_requests_supported: cfg.batch_requests_supported, + batch_requests_config: cfg.batch_requests_config, id_provider: cfg.id_provider, ping_interval: cfg.ping_interval, stop_handle: cfg.stop_handle.clone(), diff --git a/server/src/tests/http.rs b/server/src/tests/http.rs index f20ec99235..9673f52284 100644 --- a/server/src/tests/http.rs +++ b/server/src/tests/http.rs @@ -26,6 +26,7 @@ use std::net::SocketAddr; +use crate::server::BatchRequestConfig; use crate::types::error::CallError; use crate::{RpcModule, ServerBuilder, ServerHandle}; use jsonrpsee_core::{Error, RpcResult}; @@ -505,7 +506,8 @@ async fn can_set_the_max_response_size_to_batch() { async fn disabled_batches() { let addr = "127.0.0.1:0"; // Disable batches support. - let server = ServerBuilder::default().batch_requests_supported(false).build(addr).await.unwrap(); + let server = + ServerBuilder::default().set_batch_request_config(BatchRequestConfig::Disabled).build(addr).await.unwrap(); let mut module = RpcModule::new(()); module.register_method("should_ok", |_, _ctx| "ok").unwrap(); let addr = server.local_addr().unwrap(); @@ -524,6 +526,30 @@ async fn disabled_batches() { handle.stopped().await; } +#[tokio::test] +async fn batch_limit_works() { + let addr = "127.0.0.1:0"; + // Disable batches support. + let server = + ServerBuilder::default().set_batch_request_config(BatchRequestConfig::Limit(1)).build(addr).await.unwrap(); + let mut module = RpcModule::new(()); + module.register_method("should_ok", |_, _ctx| "ok").unwrap(); + let addr = server.local_addr().unwrap(); + let uri = to_http_uri(addr); + let handle = server.start(module).unwrap(); + + // Send a valid batch. + let req = r#"[ + {"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1}, + {"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2} + ]"#; + let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap(); + assert_eq!(response.body, batches_too_large(1)); + + handle.stop().unwrap(); + handle.stopped().await; +} + #[tokio::test] async fn http2_method_call_works() { init_logger(); diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 22066f7952..2f18ee8191 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -24,6 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::server::BatchRequestConfig; use crate::tests::helpers::{deser_call, init_logger, server_with_context}; use crate::types::SubscriptionId; use crate::{RpcModule, ServerBuilder}; @@ -575,7 +576,7 @@ async fn custom_subscription_id_works() { async fn disabled_batches() { // Disable batches support. let server = ServerBuilder::default() - .batch_requests_supported(false) + .set_batch_request_config(BatchRequestConfig::Disabled) .build("127.0.0.1:0") .with_default_timeout() .await @@ -601,6 +602,36 @@ async fn disabled_batches() { server_handle.stopped().await; } +#[tokio::test] +async fn batch_limit_works() { + // Disable batches support. + let server = ServerBuilder::default() + .set_batch_request_config(BatchRequestConfig::Limit(1)) + .build("127.0.0.1:0") + .with_default_timeout() + .await + .unwrap() + .unwrap(); + + let mut module = RpcModule::new(()); + module.register_method("should_ok", |_, _ctx| "ok").unwrap(); + let addr = server.local_addr().unwrap(); + + let server_handle = server.start(module).unwrap(); + + // Send a valid batch. + let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); + let req = r#"[ + {"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1}, + {"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2} + ]"#; + let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap(); + assert_eq!(response, batches_too_large(1)); + + server_handle.stop().unwrap(); + server_handle.stopped().await; +} + #[tokio::test] async fn invalid_batch_calls() { init_logger(); diff --git a/server/src/transport/http.rs b/server/src/transport/http.rs index 250288f8e6..71af8f8cc6 100644 --- a/server/src/transport/http.rs +++ b/server/src/transport/http.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use crate::logger::{self, Logger, TransportProtocol}; +use crate::server::BatchRequestConfig; use futures_util::future::Either; use futures_util::stream::{FuturesOrdered, StreamExt}; @@ -13,7 +14,9 @@ use jsonrpsee_core::server::helpers::{batch_response_error, prepare_error, Batch use jsonrpsee_core::server::{MethodCallback, Methods}; use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str}; use jsonrpsee_core::JsonRawValue; -use jsonrpsee_types::error::{ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; +use jsonrpsee_types::error::{ + reject_too_big_batch_request, ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, +}; use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request}; use tokio::sync::OwnedSemaphorePermit; use tracing::instrument; @@ -53,7 +56,7 @@ pub(crate) struct ProcessValidatedRequest<'a, L: Logger> { pub(crate) max_request_body_size: u32, pub(crate) max_response_body_size: u32, pub(crate) max_log_length: u32, - pub(crate) batch_requests_supported: bool, + pub(crate) batch_requests_config: BatchRequestConfig, pub(crate) request_start: L::Instant, } @@ -68,7 +71,7 @@ pub(crate) async fn process_validated_request( max_request_body_size, max_response_body_size, max_log_length, - batch_requests_supported, + batch_requests_config, request_start, } = input; @@ -92,17 +95,21 @@ pub(crate) async fn process_validated_request( logger.on_response(&response.result, request_start, TransportProtocol::Http); response::ok_response(response.result) } - // Batch of requests or notifications - else if !batch_requests_supported { - let err = MethodResponse::error( - Id::Null, - ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), - ); - logger.on_response(&err.result, request_start, TransportProtocol::Http); - response::ok_response(err.result) - } - // Batch of requests or notifications + // Batch of requests. else { + let limit = match batch_requests_config { + BatchRequestConfig::Disabled => { + let response = MethodResponse::error( + Id::Null, + ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), + ); + logger.on_response(&response.result, request_start, TransportProtocol::WebSocket); + return response::ok_response(response.result); + } + BatchRequestConfig::Limit(limit) => limit as usize, + BatchRequestConfig::Unlimited => usize::MAX, + }; + let response = process_batch_request(Batch { data: body, call: CallData { @@ -113,6 +120,7 @@ pub(crate) async fn process_validated_request( max_log_length, request_start, }, + max_len: limit, }) .await; logger.on_response(&response, request_start, TransportProtocol::Http); @@ -124,6 +132,7 @@ pub(crate) async fn process_validated_request( pub(crate) struct Batch<'a, L: Logger> { data: Vec, call: CallData<'a, L>, + max_len: usize, } #[derive(Debug, Clone)] @@ -144,9 +153,13 @@ pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> String where L: Logger, { - let Batch { data, call } = b; + let Batch { data, call, max_len } = b; if let Ok(batch) = serde_json::from_slice::>(&data) { + if batch.len() > max_len { + return batch_response_error(Id::Null, reject_too_big_batch_request(max_len)); + } + let mut got_notif = false; let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize); @@ -261,7 +274,7 @@ pub(crate) struct HandleRequest { pub(crate) max_request_body_size: u32, pub(crate) max_response_body_size: u32, pub(crate) max_log_length: u32, - pub(crate) batch_requests_supported: bool, + pub(crate) batch_requests_config: BatchRequestConfig, pub(crate) logger: L, pub(crate) conn: Arc, pub(crate) remote_addr: SocketAddr, @@ -276,7 +289,7 @@ pub(crate) async fn handle_request( max_request_body_size, max_response_body_size, max_log_length, - batch_requests_supported, + batch_requests_config, logger, conn, remote_addr, @@ -293,7 +306,7 @@ pub(crate) async fn handle_request( max_request_body_size, max_response_body_size, max_log_length, - batch_requests_supported, + batch_requests_config, logger: &logger, request_start, }) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2d3110c027..3d8387be82 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::future::{FutureDriver, StopHandle}; use crate::logger::{self, Logger, TransportProtocol}; -use crate::server::ServiceData; +use crate::server::{BatchRequestConfig, ServiceData}; use futures_util::future::{self, Either}; use futures_util::io::{BufReader, BufWriter}; @@ -19,8 +19,8 @@ use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{Error, JsonRawValue}; use jsonrpsee_types::error::{ - reject_too_big_request, reject_too_many_subscriptions, ErrorCode, BATCHES_NOT_SUPPORTED_CODE, - BATCHES_NOT_SUPPORTED_MSG, + reject_too_big_batch_request, reject_too_big_request, reject_too_many_subscriptions, ErrorCode, + BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, }; use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request}; use soketto::connection::Error as SokettoError; @@ -54,6 +54,7 @@ pub(crate) async fn send_ping(sender: &mut Sender) -> Result<(), Error> { pub(crate) struct Batch<'a, L: Logger> { pub(crate) data: Vec, pub(crate) call: CallData<'a, L>, + pub(crate) max_len: usize, } #[derive(Debug, Clone)] @@ -74,9 +75,13 @@ pub(crate) struct CallData<'a, L: Logger> { // complete batch response back to the client over `tx`. #[instrument(name = "batch", skip(b), level = "TRACE")] pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> Option { - let Batch { data, call } = b; + let Batch { data, call, max_len } = b; if let Ok(batch) = serde_json::from_slice::>(&data) { + if batch.len() > max_len { + return Some(batch_response_error(Id::Null, reject_too_big_batch_request(max_len))); + } + let mut got_notif = false; let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize); @@ -231,7 +236,7 @@ pub(crate) async fn background_task( max_response_body_size, max_log_length, max_subscriptions_per_connection, - batch_requests_supported, + batch_requests_config, stop_handle, id_provider, ping_interval, @@ -342,15 +347,21 @@ pub(crate) async fn background_task( method_executor.add(fut); } - Some(b'[') if !batch_requests_supported => { - let response = MethodResponse::error( - Id::Null, - ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), - ); - logger.on_response(&response.result, request_start, TransportProtocol::WebSocket); - sink_permit.send_raw(response.result); - } Some(b'[') => { + let limit = match batch_requests_config { + BatchRequestConfig::Disabled => { + let response = MethodResponse::error( + Id::Null, + ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), + ); + logger.on_response(&response.result, request_start, TransportProtocol::WebSocket); + sink_permit.send_raw(response.result); + continue; + } + BatchRequestConfig::Limit(limit) => limit as usize, + BatchRequestConfig::Unlimited => usize::MAX, + }; + // Make sure the following variables are not moved into async closure below. let methods = &methods; let sink = sink.clone(); @@ -372,6 +383,7 @@ pub(crate) async fn background_task( logger, request_start, }, + max_len: limit, }) .await; diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index ba3e5e7180..41f9b28118 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -81,6 +81,12 @@ pub fn batches_not_supported() -> String { r#"{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}"#.into() } +pub fn batches_too_large(max_limit: usize) -> String { + format!( + r#"{{"jsonrpc":"2.0","error":{{"code":-32010,"message":"The batch request was too large","data":"Exceeded max limit of {max_limit}"}},"id":null}}"# + ) +} + pub fn oversized_response(id: Id, max_limit: u32) -> String { format!( r#"{{"jsonrpc":"2.0","error":{{"code":-32008,"message":"Response is too big","data":"Exceeded max limit of {}"}},"id":{}}}"#, diff --git a/types/src/error.rs b/types/src/error.rs index c2c7197786..c7e1a2433c 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -143,6 +143,8 @@ pub const OVERSIZED_REQUEST_CODE: i32 = -32007; pub const OVERSIZED_RESPONSE_CODE: i32 = -32008; /// Server is busy error code. pub const SERVER_IS_BUSY_CODE: i32 = -32009; +/// Batch request limit was exceed. +pub const TOO_BIG_BATCH_CODE: i32 = -32010; /// Parse error message pub const PARSE_ERROR_MSG: &str = "Parse error"; @@ -166,6 +168,8 @@ pub const SERVER_ERROR_MSG: &str = "Server error"; pub const BATCHES_NOT_SUPPORTED_MSG: &str = "Batched requests are not supported by this server"; /// Subscription limit per connection was exceeded. pub const TOO_MANY_SUBSCRIPTIONS_MSG: &str = "Too many subscriptions on the connection"; +/// Batched requests limit was exceed. +pub const TOO_BIG_BATCH_MSG: &str = "The batch request was too large"; /// JSONRPC error code #[derive(Error, Debug, PartialEq, Eq, Copy, Clone)] @@ -303,6 +307,11 @@ pub fn reject_too_big_request(limit: u32) -> ErrorObjectOwned { ) } +/// Helper to get a `JSON-RPC` error object when the maximum batch request size have been exceeded. +pub fn reject_too_big_batch_request(limit: usize) -> ErrorObjectOwned { + ErrorObjectOwned::owned(TOO_BIG_BATCH_CODE, TOO_BIG_BATCH_MSG, Some(format!("Exceeded max limit of {limit}"))) +} + #[cfg(test)] mod tests { use super::{ErrorCode, ErrorObject};