Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't allocate until we know it's worth it #420

Merged
merged 11 commits into from
Aug 16, 2021
9 changes: 7 additions & 2 deletions http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ impl HttpTransportClient {
}
}

/// Send serialized message and wait until all bytes from the HTTP message body is read.
/// Send serialized message and wait until all bytes from the HTTP message body have been read.
pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let body = http_helpers::read_response_to_body(&parts.headers, body, self.max_request_body_size).await?;
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;
Ok(body)
}

Expand Down Expand Up @@ -95,6 +95,10 @@ pub(crate) enum Error {
/// Request body too large.
#[error("The request body was too large")]
RequestTooLarge,

/// Malformed request.
#[error("Malformed request")]
Malformed,
}

impl<T> From<GenericTransportError<T>> for Error
Expand All @@ -104,6 +108,7 @@ where
fn from(err: GenericTransportError<T>) -> Self {
match err {
GenericTransportError::<T>::TooLarge => Self::RequestTooLarge,
GenericTransportError::<T>::Malformed => Self::Malformed,
GenericTransportError::<T>::Inner(e) => Self::Http(Box::new(e)),
}
}
Expand Down
79 changes: 57 additions & 22 deletions http-server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,98 @@

//! Contains common builders for hyper responses.

/// Create a response for plaintext internal error.
pub fn internal_error<T: Into<String>>(msg: T) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, format!("Internal Server Error: {}", msg.into()))
use crate::types::v2::{
error::{JsonRpcError, JsonRpcErrorCode},
params::{Id, TwoPointZero},
};

const JSON: &str = "application/json; charset=utf-8";
const TEXT: &str = "text/plain";

/// Create a response for json internal error.
pub fn internal_error() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::InternalError.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON)
}

/// Create a response for not allowed hosts.
/// Create a text/plain response for not allowed hosts.
pub fn host_not_allowed() -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::FORBIDDEN, "Provided Host header is not whitelisted.\n".to_owned())
from_template(hyper::StatusCode::FORBIDDEN, "Provided Host header is not whitelisted.\n".to_owned(), TEXT)
}

/// Create a response for disallowed method used.
/// Create a text/plain response for disallowed method used.
pub fn method_not_allowed() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::METHOD_NOT_ALLOWED,
"Used HTTP Method is not allowed. POST or OPTIONS is required\n".to_owned(),
TEXT,
)
}

/// CORS invalid
/// Create a text/plain response for invalid CORS "Origin" headers.
pub fn invalid_allow_origin() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::FORBIDDEN,
"Origin of the request is not whitelisted. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(),
TEXT,
)
}

/// CORS header invalid
/// Create a text/plain response for invalid CORS "Allow-*" headers.
pub fn invalid_allow_headers() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::FORBIDDEN,
"Requested headers are not allowed for CORS. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(),
TEXT,
)
}

/// Create a response for too large (413)
pub fn too_large<S: Into<String>>(msg: S) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, msg.into())
/// Create a json response for oversized requests (413)
pub fn too_large() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::OversizedRequest.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, error, JSON)
}

/// Create a text response for a template.
fn from_template(status: hyper::StatusCode, body: String) -> hyper::Response<hyper::Body> {
/// Create a json response for empty or malformed requests (400)
pub fn malformed() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::ParseError.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::BAD_REQUEST, error, JSON)
}

/// Create a response body.
fn from_template<S: Into<hyper::Body>>(
status: hyper::StatusCode,
body: S,
content_type: &'static str,
) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(status)
.header("content-type", hyper::header::HeaderValue::from_static("text/plain; charset=utf-8"))
.body(hyper::Body::from(body))
.header("content-type", hyper::header::HeaderValue::from_static(content_type))
.body(body.into())
// Parsing `StatusCode` and `HeaderValue` is infalliable but
// parsing body content is not.
.expect("Unable to parse response body for type conversion")
}

/// Create a valid JSON response.
pub fn ok_response(body: String) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.header("content-type", hyper::header::HeaderValue::from_static("application/json; charset=utf-8"))
.body(hyper::Body::from(body))
// Parsing `StatusCode` and `HeaderValue` is infalliable but
// parsing body content is not.
.expect("Unable to parse response body for type conversion")
from_template(hyper::StatusCode::OK, body, JSON)
}
81 changes: 41 additions & 40 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use jsonrpsee_types::{
},
TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::http_helpers::read_response_to_body;
use jsonrpsee_utils::http_helpers::read_body;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, prepare_error, send_error},
rpc_module::Methods,
Expand Down Expand Up @@ -201,60 +201,61 @@ impl Server {
}

let (parts, body) = request.into_parts();
let body = match read_response_to_body(&parts.headers, body, max_request_body_size).await {
Ok(body) => body,
Err(GenericTransportError::TooLarge) => {
return Ok::<_, HyperError>(response::too_large("The request was too large"))
}

let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant change of the PR.

Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
return Ok::<_, HyperError>(response::internal_error(e.to_string()))
log::error!("Internal error reading request body: {}", e);
return Ok::<_, HyperError>(response::internal_error());
}
};

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
// Is this a single request or a batch (or error)?
let mut single = true;
type Notif<'a> = JsonRpcNotification<'a, Option<&'a RawValue>>;
match body.get(0) {
// Single request or notification
Some(b'{') => {
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
methods.execute(&tx, req, 0).await;
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
methods.execute(&tx, req, 0).await;
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}
// Bacth of requests or notifications
Some(b'[') => {
if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
single = false;
for req in batch {
methods.execute(&tx, req, 0).await;
}
} else {
send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
for req in batch {
methods.execute(&tx, req, 0).await;
}
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
// Garbage request
_ => send_error(Id::Null, &tx, JsonRpcErrorCode::ParseError.into()),
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}

// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if single {
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
Expand Down
3 changes: 3 additions & 0 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub enum GenericTransportError<T: std::error::Error + Send + Sync> {
/// Request was too large.
#[error("The request was too big")]
TooLarge,
/// Malformed request
#[error("Malformed request")]
Malformed,
/// Concrete transport error.
#[error("Transport error: {0}")]
Inner(T),
Expand Down
29 changes: 22 additions & 7 deletions utils/src/http_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
use futures_util::stream::StreamExt;
use jsonrpsee_types::error::GenericTransportError;

/// Read a hyper response with configured `HTTP` settings.
/// Read a data from a [`hyper::Body`] and return the data if it is valid and within the allowed size range.
///
/// Returns `Ok(bytes)` if the body was in valid size range.
/// Returns `Ok((bytes, single))` if the body was in valid size range; and a bool indicating whether the JSON-RPC
/// request is a single or a batch.
/// Returns `Err` if the body was too large or the body couldn't be read.
pub async fn read_response_to_body(
pub async fn read_body(
headers: &hyper::HeaderMap,
mut body: hyper::Body,
max_request_body_size: u32,
) -> Result<Vec<u8>, GenericTransportError<hyper::Error>> {
) -> Result<(Vec<u8>, bool), GenericTransportError<hyper::Error>> {
// NOTE(niklasad1): Values bigger than `u32::MAX` will be turned into zero here. This is unlikely to occur in practice
// and for that case we fallback to allocating in the while-loop below instead of pre-allocating.
let body_size = read_header_content_length(headers).unwrap_or(0);
Expand All @@ -46,7 +47,21 @@ pub async fn read_response_to_body(
return Err(GenericTransportError::TooLarge);
}

let first_chunk =
body.next().await.ok_or(GenericTransportError::Malformed)?.map_err(GenericTransportError::Inner)?;

if first_chunk.len() > max_request_body_size as usize {
return Err(GenericTransportError::TooLarge);
}

let single = match first_chunk.get(0) {
Some(b'{') => true,
Some(b'[') => false,
_ => return Err(GenericTransportError::Malformed),
};

let mut received_data = Vec::with_capacity(body_size as usize);
received_data.extend_from_slice(&first_chunk);

while let Some(chunk) = body.next().await {
let chunk = chunk.map_err(GenericTransportError::Inner)?;
Expand All @@ -56,7 +71,7 @@ pub async fn read_response_to_body(
}
received_data.extend_from_slice(&chunk);
}
Ok(received_data)
Ok((received_data, single))
}

/// Read the `Content-Length` HTTP Header. Must fit into a `u32`; returns `None` otherwise.
Expand Down Expand Up @@ -90,13 +105,13 @@ pub fn read_header_values<'a>(

#[cfg(test)]
mod tests {
use super::{read_header_content_length, read_response_to_body};
use super::{read_body, read_header_content_length};

#[tokio::test]
async fn body_to_bytes_size_limit_works() {
let headers = hyper::header::HeaderMap::new();
let body = hyper::Body::from(vec![0; 128]);
assert!(read_response_to_body(&headers, body, 127).await.is_err());
assert!(read_body(&headers, body, 127).await.is_err());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion utils/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn send_error(id: Id, tx: &MethodSink, error: JsonRpcErrorObject) {
/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
/// unparseable garbage.
pub fn prepare_error(data: &[u8]) -> (Id<'_>, JsonRpcErrorCode) {
match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
match serde_json::from_slice::<JsonRpcInvalidRequest>(data) {
Ok(JsonRpcInvalidRequest { id }) => (id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (Id::Null, JsonRpcErrorCode::ParseError),
}
Expand Down