diff --git a/http-server/src/response.rs b/http-server/src/response.rs index 18f2313fdf..0cd24cdb6d 100644 --- a/http-server/src/response.rs +++ b/http-server/src/response.rs @@ -26,19 +26,15 @@ //! Contains common builders for hyper responses. -use crate::types::v2::{ErrorCode, Id, RpcError, TwoPointZero}; +use crate::types::v2::{ErrorCode, Id, RpcError}; const JSON: &str = "application/json; charset=utf-8"; const TEXT: &str = "text/plain"; /// Create a response for json internal error. pub fn internal_error() -> hyper::Response { - let error = serde_json::to_string(&RpcError { - jsonrpc: TwoPointZero, - error: ErrorCode::InternalError.into(), - id: Id::Null, - }) - .expect("built from known-good data; qed"); + let error = serde_json::to_string(&RpcError::new(ErrorCode::InternalError.into(), Id::Null)) + .expect("built from known-good data; qed"); from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON) } @@ -77,21 +73,16 @@ pub fn invalid_allow_headers() -> hyper::Response { /// Create a json response for oversized requests (413) pub fn too_large() -> hyper::Response { - let error = serde_json::to_string(&RpcError { - jsonrpc: TwoPointZero, - error: ErrorCode::OversizedRequest.into(), - id: Id::Null, - }) - .expect("built from known-good data; qed"); + let error = serde_json::to_string(&RpcError::new(ErrorCode::OversizedRequest.into(), Id::Null)) + .expect("built from known-good data; qed"); from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, error, JSON) } /// Create a json response for empty or malformed requests (400) pub fn malformed() -> hyper::Response { - let error = - serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error: ErrorCode::ParseError.into(), id: Id::Null }) - .expect("built from known-good data; qed"); + let error = serde_json::to_string(&RpcError::new(ErrorCode::ParseError.into(), Id::Null)) + .expect("built from known-good data; qed"); from_template(hyper::StatusCode::BAD_REQUEST, error, JSON) } diff --git a/types/src/lib.rs b/types/src/lib.rs index a932e94d5c..48a074930b 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -70,3 +70,6 @@ pub mod __reexports { /// JSON-RPC result. pub type RpcResult = std::result::Result; + +/// Empty `RpcParams` type; +pub type EmptyParams = Vec<()>; diff --git a/types/src/v2/request.rs b/types/src/v2/request.rs index e63ee3ef97..57739c0bfe 100644 --- a/types/src/v2/request.rs +++ b/types/src/v2/request.rs @@ -49,6 +49,13 @@ pub struct Request<'a> { pub params: Option<&'a RawValue>, } +impl<'a> Request<'a> { + /// Create a new [`Request`]. + pub fn new(method: Cow<'a, str>, params: Option<&'a RawValue>, id: Id<'a>) -> Self { + Self { jsonrpc: TwoPointZero, id, method, params } + } +} + /// JSON-RPC Invalid request as defined in the [spec](https://www.jsonrpc.org/specification#request-object). #[derive(Deserialize, Debug, PartialEq)] pub struct InvalidRequest<'a> { @@ -71,6 +78,13 @@ pub struct Notification<'a, T> { pub params: T, } +impl<'a, T> Notification<'a, T> { + /// Create a new [`Notification`]. + pub fn new(method: Cow<'a, str>, params: T) -> Self { + Self { jsonrpc: TwoPointZero, method, params } + } +} + /// Serializable [JSON-RPC object](https://www.jsonrpc.org/specification#request-object). #[derive(Serialize, Debug)] pub struct RequestSer<'a> { diff --git a/types/src/v2/response.rs b/types/src/v2/response.rs index 1936614fa7..3ce3324912 100644 --- a/types/src/v2/response.rs +++ b/types/src/v2/response.rs @@ -45,6 +45,13 @@ pub struct Response<'a, T> { pub id: Id<'a>, } +impl<'a, T> Response<'a, T> { + /// Create a new [`Response`]. + pub fn new(result: T, id: Id<'a>) -> Response<'a, T> { + Response { jsonrpc: TwoPointZero, result, id } + } +} + /// Return value for subscriptions. #[derive(Serialize, Deserialize, Debug)] pub struct SubscriptionPayload { diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index 25d863f07e..d5182b4807 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -31,7 +31,7 @@ use jsonrpsee_types::to_json_raw_value; use jsonrpsee_types::v2::error::{OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; use jsonrpsee_types::v2::{ error::{CALL_EXECUTION_FAILED_CODE, UNKNOWN_ERROR_CODE}, - ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError, TwoPointZero, + ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError, }; use serde::Serialize; @@ -106,8 +106,7 @@ impl MethodSink { pub fn send_response(&self, id: Id, result: impl Serialize) -> bool { let mut writer = BoundedWriter::new(self.max_response_size as usize); - let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) - { + let json = match serde_json::to_writer(&mut writer, &Response::new(result, id.clone())) { Ok(_) => { // Safety - serde_json does not emit invalid UTF-8. unsafe { String::from_utf8_unchecked(writer.into_bytes()) } @@ -139,7 +138,7 @@ impl MethodSink { /// Send a JSON-RPC error to the client pub fn send_error(&self, id: Id, error: ErrorObject) -> bool { - let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) { + let json = match serde_json::to_string(&RpcError::new(error, id)) { Ok(json) => json, Err(err) => { tracing::error!("Error serializing error message: {:?}", err); @@ -214,16 +213,14 @@ pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver) -> Stri #[cfg(test)] mod tests { - use super::{BoundedWriter, Id, Response, TwoPointZero}; + use super::{BoundedWriter, Id, Response}; #[test] fn bounded_serializer_work() { let mut writer = BoundedWriter::new(100); let result = "success"; - assert!( - serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: Id::Number(1), result }).is_ok() - ); + assert!(serde_json::to_writer(&mut writer, &Response::new(result, Id::Number(1))).is_ok()); assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","result":"success","id":1}"#); } diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 9ba5aaf7ed..8efcdaeaf7 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -36,7 +36,7 @@ use jsonrpsee_types::{ traits::ToRpcParams, v2::{ ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, - SubscriptionResponse, TwoPointZero, + SubscriptionResponse, }, DeserializeOwned, }; @@ -357,12 +357,7 @@ impl Methods { /// Helper alternative to `execute`, useful for writing unit tests without having to spin /// a server up. pub async fn call(&self, method: &str, params: Option>) -> Option { - let req = Request { - jsonrpc: TwoPointZero, - id: Id::Number(0), - method: Cow::borrowed(method), - params: params.as_deref(), - }; + let req = Request::new(method.into(), params.as_deref(), Id::Number(0)); let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx); @@ -374,13 +369,64 @@ impl Methods { rx.next().await } + /// Perform a "in memory JSON-RPC method call" and receive further subscriptions. + /// This is useful if you want to support both `method calls` and `subscriptions` + /// in the same API. + /// + /// There are better variants than this method if you only want + /// method calls or only subscriptions. + /// + /// See [`Methods::test_subscription`] and [`Methods::call_with`] for + /// for further documentation. + /// + /// Returns a response to the actual method call and a stream to process + /// for further notifications if a subscription was registered by the call. + /// + /// ``` + /// #[tokio::main] + /// async fn main() { + /// use jsonrpsee::RpcModule; + /// use jsonrpsee::types::{ + /// EmptyParams, + /// v2::{Response, SubscriptionResponse} + /// }; + /// use futures_util::StreamExt; + /// + /// let mut module = RpcModule::new(()); + /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| { + /// sink.send(&"one answer").unwrap(); + /// Ok(()) + /// }).unwrap(); + /// let (resp, mut stream) = module.call_and_subscribe("hi", EmptyParams::new()).await.unwrap(); + /// assert!(serde_json::from_str::>(&resp).is_ok()); + /// let raw_sub_resp = stream.next().await.unwrap(); + /// let sub_resp: SubscriptionResponse = serde_json::from_str(&raw_sub_resp).unwrap(); + /// assert_eq!(&sub_resp.params.result, "one answer"); + /// } + /// ``` + pub async fn call_and_subscribe( + &self, + method: &str, + params: Params, + ) -> Option<(String, mpsc::UnboundedReceiver)> { + let (tx, mut rx) = mpsc::unbounded(); + let params = params.to_rpc_params().ok(); + let req = Request::new(method.into(), params.as_deref(), Id::Number(0)); + let sink = MethodSink::new(tx); + + if let MethodResult::Async(fut) = self.execute(&sink, req, 0) { + fut.await; + } + + rx.next().await.map(|r| (r, rx)) + } + /// Test helper that sets up a subscription using the given `method`. Returns a tuple of the /// [`SubscriptionId`] and a channel on which subscription JSON payloads can be received. pub async fn test_subscription(&self, method: &str, params: impl ToRpcParams) -> TestSubscription { let params = params.to_rpc_params().expect("valid JSON-RPC params"); tracing::trace!("[Methods::test_subscription] Calling subscription method: {:?}, params: {:?}", method, params); - let req = - Request { jsonrpc: TwoPointZero, id: Id::Number(0), method: Cow::borrowed(method), params: Some(¶ms) }; + let req = Request::new(method.into(), Some(¶ms), Id::Number(0)); let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx.clone()); @@ -703,11 +749,10 @@ impl SubscriptionSink { } fn build_message(&self, result: &T) -> Result { - serde_json::to_string(&SubscriptionResponse { - jsonrpc: TwoPointZero, - method: self.method.into(), - params: SubscriptionPayload { subscription: RpcSubscriptionId::Num(self.uniq_sub.sub_id), result }, - }) + serde_json::to_string(&SubscriptionResponse::new( + self.method.into(), + SubscriptionPayload { subscription: RpcSubscriptionId::Num(self.uniq_sub.sub_id), result }, + )) .map_err(Into::into) }