Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): configurable limit for batch requests. #1073

Merged
merged 5 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ where
let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection;
let allow_hosts = self.cfg.allow_hosts;
let logger = self.logger;
let batch_requests_supported = self.cfg.batch_requests_supported;
let batch_requests_config = self.cfg.batch_requests_config;
let id_provider = self.id_provider;

let mut id: u32 = 0;
Expand All @@ -145,7 +145,7 @@ where
max_response_body_size,
max_log_length,
max_subscriptions_per_connection,
batch_requests_supported,
batch_requests_config,
id_provider: id_provider.clone(),
ping_interval: self.cfg.ping_interval,
stop_handle: stop_handle.clone(),
Expand Down Expand Up @@ -194,7 +194,7 @@ struct Settings {
/// Host filtering.
allow_hosts: AllowHosts,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
batch_requests_config: BatchRequestConfig,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
/// The interval at which `Ping` frames are submitted.
Expand All @@ -207,6 +207,16 @@ struct Settings {
message_buffer_capacity: u32,
}

#[derive(Debug, Copy, Clone)]
pub enum BatchRequestConfig {
/// Batch requests are disabled.
Disabled,
/// The batch request is limited to `len`.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Limit(u32),
/// The batch request is unlimited.
Unlimited,
}

impl Default for Settings {
fn default() -> Self {
Self {
Expand All @@ -215,7 +225,7 @@ impl Default for Settings {
max_log_length: 4096,
max_connections: MAX_CONNECTIONS,
max_subscriptions_per_connection: 1024,
batch_requests_supported: true,
batch_requests_config: BatchRequestConfig::Unlimited,
allow_hosts: AllowHosts::Any,
tokio_runtime: None,
ping_interval: Duration::from_secs(60),
Expand Down Expand Up @@ -272,10 +282,12 @@ impl<B, L> Builder<B, L> {
self
}

/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.settings.batch_requests_supported = supported;
/// Configure how [batch requests](https://www.jsonrpc.org/specification#batch) shall be handled
/// by the server.
///
/// Default: batch requests are allowed and can be arbitrary big but the maximum payload size is limited.
pub fn set_batch_request_config(mut self, cfg: BatchRequestConfig) -> Self {
self.settings.batch_requests_config = cfg;
self
}

Expand Down Expand Up @@ -556,7 +568,7 @@ pub(crate) struct ServiceData<L: Logger> {
/// Maximum number of subscriptions per connection.
pub(crate) max_subscriptions_per_connection: u32,
/// Whether batch requests are supported by this server or not.
pub(crate) batch_requests_supported: bool,
pub(crate) batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
pub(crate) id_provider: Arc<dyn IdProvider>,
/// Ping interval
Expand Down Expand Up @@ -663,7 +675,7 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
max_request_body_size: self.inner.max_request_body_size,
max_response_body_size: self.inner.max_response_body_size,
max_log_length: self.inner.max_log_length,
batch_requests_supported: self.inner.batch_requests_supported,
batch_requests_config: self.inner.batch_requests_config,
logger: self.inner.logger.clone(),
conn: self.inner.conn.clone(),
remote_addr: self.inner.remote_addr,
Expand Down Expand Up @@ -696,7 +708,7 @@ struct ProcessConnection<L> {
/// Maximum number of subscriptions per connection.
max_subscriptions_per_connection: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
id_provider: Arc<dyn IdProvider>,
/// Ping interval
Expand Down Expand Up @@ -764,7 +776,7 @@ fn process_connection<'a, L: Logger, B, U>(
max_response_body_size: cfg.max_response_body_size,
max_log_length: cfg.max_log_length,
max_subscriptions_per_connection: cfg.max_subscriptions_per_connection,
batch_requests_supported: cfg.batch_requests_supported,
batch_requests_config: cfg.batch_requests_config,
id_provider: cfg.id_provider,
ping_interval: cfg.ping_interval,
stop_handle: cfg.stop_handle.clone(),
Expand Down
28 changes: 27 additions & 1 deletion server/src/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

use std::net::SocketAddr;

use crate::server::BatchRequestConfig;
use crate::types::error::CallError;
use crate::{RpcModule, ServerBuilder, ServerHandle};
use jsonrpsee_core::{Error, RpcResult};
Expand Down Expand Up @@ -505,7 +506,8 @@ async fn can_set_the_max_response_size_to_batch() {
async fn disabled_batches() {
let addr = "127.0.0.1:0";
// Disable batches support.
let server = ServerBuilder::default().batch_requests_supported(false).build(addr).await.unwrap();
let server =
ServerBuilder::default().set_batch_request_config(BatchRequestConfig::Disabled).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| "ok").unwrap();
let addr = server.local_addr().unwrap();
Expand All @@ -524,6 +526,30 @@ async fn disabled_batches() {
handle.stopped().await;
}

#[tokio::test]
async fn batch_limit_works() {
let addr = "127.0.0.1:0";
// Disable batches support.
let server =
ServerBuilder::default().set_batch_request_config(BatchRequestConfig::Limit(1)).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| "ok").unwrap();
let addr = server.local_addr().unwrap();
let uri = to_http_uri(addr);
let handle = server.start(module).unwrap();

// Send a valid batch.
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.body, batches_too_large(1));

handle.stop().unwrap();
handle.stopped().await;
}

#[tokio::test]
async fn http2_method_call_works() {
init_logger();
Expand Down
33 changes: 32 additions & 1 deletion server/src/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::server::BatchRequestConfig;
use crate::tests::helpers::{deser_call, init_logger, server_with_context};
use crate::types::SubscriptionId;
use crate::{RpcModule, ServerBuilder};
Expand Down Expand Up @@ -575,7 +576,7 @@ async fn custom_subscription_id_works() {
async fn disabled_batches() {
// Disable batches support.
let server = ServerBuilder::default()
.batch_requests_supported(false)
.set_batch_request_config(BatchRequestConfig::Disabled)
.build("127.0.0.1:0")
.with_default_timeout()
.await
Expand All @@ -601,6 +602,36 @@ async fn disabled_batches() {
server_handle.stopped().await;
}

#[tokio::test]
async fn batch_limit_works() {
// Disable batches support.
let server = ServerBuilder::default()
.set_batch_request_config(BatchRequestConfig::Limit(1))
.build("127.0.0.1:0")
.with_default_timeout()
.await
.unwrap()
.unwrap();

let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| "ok").unwrap();
let addr = server.local_addr().unwrap();

let server_handle = server.start(module).unwrap();

// Send a valid batch.
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, batches_too_large(1));

server_handle.stop().unwrap();
server_handle.stopped().await;
}

#[tokio::test]
async fn invalid_batch_calls() {
init_logger();
Expand Down
47 changes: 30 additions & 17 deletions server/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use crate::logger::{self, Logger, TransportProtocol};
use crate::server::BatchRequestConfig;

use futures_util::future::Either;
use futures_util::stream::{FuturesOrdered, StreamExt};
Expand All @@ -13,7 +14,9 @@ use jsonrpsee_core::server::helpers::{batch_response_error, prepare_error, Batch
use jsonrpsee_core::server::{MethodCallback, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::JsonRawValue;
use jsonrpsee_types::error::{ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::error::{
reject_too_big_batch_request, ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG,
};
use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request};
use tokio::sync::OwnedSemaphorePermit;
use tracing::instrument;
Expand Down Expand Up @@ -53,7 +56,7 @@ pub(crate) struct ProcessValidatedRequest<'a, L: Logger> {
pub(crate) max_request_body_size: u32,
pub(crate) max_response_body_size: u32,
pub(crate) max_log_length: u32,
pub(crate) batch_requests_supported: bool,
pub(crate) batch_requests_config: BatchRequestConfig,
pub(crate) request_start: L::Instant,
}

Expand All @@ -68,7 +71,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
batch_requests_config,
request_start,
} = input;

Expand All @@ -92,17 +95,21 @@ pub(crate) async fn process_validated_request<L: Logger>(
logger.on_response(&response.result, request_start, TransportProtocol::Http);
response::ok_response(response.result)
}
// Batch of requests or notifications
else if !batch_requests_supported {
let err = MethodResponse::error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
logger.on_response(&err.result, request_start, TransportProtocol::Http);
response::ok_response(err.result)
}
// Batch of requests or notifications
// Batch of requests.
else {
let limit = match batch_requests_config {
BatchRequestConfig::Disabled => {
let response = MethodResponse::error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
logger.on_response(&response.result, request_start, TransportProtocol::WebSocket);
return response::ok_response(response.result);
}
BatchRequestConfig::Limit(limit) => limit as usize,
BatchRequestConfig::Unlimited => usize::MAX,
};

let response = process_batch_request(Batch {
data: body,
call: CallData {
Expand All @@ -113,6 +120,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
max_log_length,
request_start,
},
max_len: limit,
})
.await;
logger.on_response(&response, request_start, TransportProtocol::Http);
Expand All @@ -124,6 +132,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
pub(crate) struct Batch<'a, L: Logger> {
data: Vec<u8>,
call: CallData<'a, L>,
max_len: usize,
}

#[derive(Debug, Clone)]
Expand All @@ -144,9 +153,13 @@ pub(crate) async fn process_batch_request<L>(b: Batch<'_, L>) -> String
where
L: Logger,
{
let Batch { data, call } = b;
let Batch { data, call, max_len } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
if batch.len() > max_len {
return batch_response_error(Id::Null, reject_too_big_batch_request(max_len));
}

let mut got_notif = false;
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);

Expand Down Expand Up @@ -261,7 +274,7 @@ pub(crate) struct HandleRequest<L: Logger> {
pub(crate) max_request_body_size: u32,
pub(crate) max_response_body_size: u32,
pub(crate) max_log_length: u32,
pub(crate) batch_requests_supported: bool,
pub(crate) batch_requests_config: BatchRequestConfig,
pub(crate) logger: L,
pub(crate) conn: Arc<OwnedSemaphorePermit>,
pub(crate) remote_addr: SocketAddr,
Expand All @@ -276,7 +289,7 @@ pub(crate) async fn handle_request<L: Logger>(
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
batch_requests_config,
logger,
conn,
remote_addr,
Expand All @@ -293,7 +306,7 @@ pub(crate) async fn handle_request<L: Logger>(
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
batch_requests_config,
logger: &logger,
request_start,
})
Expand Down
Loading