Skip to content

Commit

Permalink
rpc module: add call_and_subscribe (#588)
Browse files Browse the repository at this point in the history
* rpc module: add `call_and_subscribe`

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update types/src/v2/response.rs

Co-authored-by: David <dvdplm@gmail.com>

* grumbles

* fix rustdoc links

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
niklasad1 and dvdplm committed Dec 7, 2021
1 parent be6f64a commit f8a412e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 38 deletions.
23 changes: 7 additions & 16 deletions http-server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@

//! Contains common builders for hyper responses.
use crate::types::v2::{ErrorCode, Id, RpcError, TwoPointZero};
use crate::types::v2::{ErrorCode, Id, RpcError};

const JSON: &str = "application/json; charset=utf-8";
const TEXT: &str = "text/plain";

/// Create a response for json internal error.
pub fn internal_error() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&RpcError {
jsonrpc: TwoPointZero,
error: ErrorCode::InternalError.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");
let error = serde_json::to_string(&RpcError::new(ErrorCode::InternalError.into(), Id::Null))
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON)
}
Expand Down Expand Up @@ -77,21 +73,16 @@ pub fn invalid_allow_headers() -> hyper::Response<hyper::Body> {

/// Create a json response for oversized requests (413)
pub fn too_large() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&RpcError {
jsonrpc: TwoPointZero,
error: ErrorCode::OversizedRequest.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");
let error = serde_json::to_string(&RpcError::new(ErrorCode::OversizedRequest.into(), Id::Null))
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, error, JSON)
}

/// Create a json response for empty or malformed requests (400)
pub fn malformed() -> hyper::Response<hyper::Body> {
let error =
serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error: ErrorCode::ParseError.into(), id: Id::Null })
.expect("built from known-good data; qed");
let error = serde_json::to_string(&RpcError::new(ErrorCode::ParseError.into(), Id::Null))
.expect("built from known-good data; qed");

from_template(hyper::StatusCode::BAD_REQUEST, error, JSON)
}
Expand Down
3 changes: 3 additions & 0 deletions types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ pub mod __reexports {

/// JSON-RPC result.
pub type RpcResult<T> = std::result::Result<T, Error>;

/// Empty `RpcParams` type;
pub type EmptyParams = Vec<()>;
14 changes: 14 additions & 0 deletions types/src/v2/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub struct Request<'a> {
pub params: Option<&'a RawValue>,
}

impl<'a> Request<'a> {
/// Create a new [`Request`].
pub fn new(method: Cow<'a, str>, params: Option<&'a RawValue>, id: Id<'a>) -> Self {
Self { jsonrpc: TwoPointZero, id, method, params }
}
}

/// JSON-RPC Invalid request as defined in the [spec](https://www.jsonrpc.org/specification#request-object).
#[derive(Deserialize, Debug, PartialEq)]
pub struct InvalidRequest<'a> {
Expand All @@ -71,6 +78,13 @@ pub struct Notification<'a, T> {
pub params: T,
}

impl<'a, T> Notification<'a, T> {
/// Create a new [`Notification`].
pub fn new(method: Cow<'a, str>, params: T) -> Self {
Self { jsonrpc: TwoPointZero, method, params }
}
}

/// Serializable [JSON-RPC object](https://www.jsonrpc.org/specification#request-object).
#[derive(Serialize, Debug)]
pub struct RequestSer<'a> {
Expand Down
7 changes: 7 additions & 0 deletions types/src/v2/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ pub struct Response<'a, T> {
pub id: Id<'a>,
}

impl<'a, T> Response<'a, T> {
/// Create a new [`Response`].
pub fn new(result: T, id: Id<'a>) -> Response<'a, T> {
Response { jsonrpc: TwoPointZero, result, id }
}
}

/// Return value for subscriptions.
#[derive(Serialize, Deserialize, Debug)]
pub struct SubscriptionPayload<T> {
Expand Down
13 changes: 5 additions & 8 deletions utils/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use jsonrpsee_types::to_json_raw_value;
use jsonrpsee_types::v2::error::{OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::v2::{
error::{CALL_EXECUTION_FAILED_CODE, UNKNOWN_ERROR_CODE},
ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError, TwoPointZero,
ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError,
};
use serde::Serialize;

Expand Down Expand Up @@ -106,8 +106,7 @@ impl MethodSink {
pub fn send_response(&self, id: Id, result: impl Serialize) -> bool {
let mut writer = BoundedWriter::new(self.max_response_size as usize);

let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result })
{
let json = match serde_json::to_writer(&mut writer, &Response::new(result, id.clone())) {
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
unsafe { String::from_utf8_unchecked(writer.into_bytes()) }
Expand Down Expand Up @@ -139,7 +138,7 @@ impl MethodSink {

/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) {
let json = match serde_json::to_string(&RpcError::new(error, id)) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing error message: {:?}", err);
Expand Down Expand Up @@ -214,16 +213,14 @@ pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver<String>) -> Stri

#[cfg(test)]
mod tests {
use super::{BoundedWriter, Id, Response, TwoPointZero};
use super::{BoundedWriter, Id, Response};

#[test]
fn bounded_serializer_work() {
let mut writer = BoundedWriter::new(100);
let result = "success";

assert!(
serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: Id::Number(1), result }).is_ok()
);
assert!(serde_json::to_writer(&mut writer, &Response::new(result, Id::Number(1))).is_ok());
assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","result":"success","id":1}"#);
}

Expand Down
73 changes: 59 additions & 14 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use jsonrpsee_types::{
traits::ToRpcParams,
v2::{
ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse, TwoPointZero,
SubscriptionResponse,
},
DeserializeOwned,
};
Expand Down Expand Up @@ -357,12 +357,7 @@ impl Methods {
/// Helper alternative to `execute`, useful for writing unit tests without having to spin
/// a server up.
pub async fn call(&self, method: &str, params: Option<Box<RawValue>>) -> Option<String> {
let req = Request {
jsonrpc: TwoPointZero,
id: Id::Number(0),
method: Cow::borrowed(method),
params: params.as_deref(),
};
let req = Request::new(method.into(), params.as_deref(), Id::Number(0));

let (tx, mut rx) = mpsc::unbounded();
let sink = MethodSink::new(tx);
Expand All @@ -374,13 +369,64 @@ impl Methods {
rx.next().await
}

/// Perform a "in memory JSON-RPC method call" and receive further subscriptions.
/// This is useful if you want to support both `method calls` and `subscriptions`
/// in the same API.
///
/// There are better variants than this method if you only want
/// method calls or only subscriptions.
///
/// See [`Methods::test_subscription`] and [`Methods::call_with`] for
/// for further documentation.
///
/// Returns a response to the actual method call and a stream to process
/// for further notifications if a subscription was registered by the call.
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// use jsonrpsee::RpcModule;
/// use jsonrpsee::types::{
/// EmptyParams,
/// v2::{Response, SubscriptionResponse}
/// };
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.call_and_subscribe("hi", EmptyParams::new()).await.unwrap();
/// assert!(serde_json::from_str::<Response<u64>>(&resp).is_ok());
/// let raw_sub_resp = stream.next().await.unwrap();
/// let sub_resp: SubscriptionResponse<String> = serde_json::from_str(&raw_sub_resp).unwrap();
/// assert_eq!(&sub_resp.params.result, "one answer");
/// }
/// ```
pub async fn call_and_subscribe<Params: ToRpcParams>(
&self,
method: &str,
params: Params,
) -> Option<(String, mpsc::UnboundedReceiver<String>)> {
let (tx, mut rx) = mpsc::unbounded();
let params = params.to_rpc_params().ok();
let req = Request::new(method.into(), params.as_deref(), Id::Number(0));
let sink = MethodSink::new(tx);

if let MethodResult::Async(fut) = self.execute(&sink, req, 0) {
fut.await;
}

rx.next().await.map(|r| (r, rx))
}

/// Test helper that sets up a subscription using the given `method`. Returns a tuple of the
/// [`SubscriptionId`] and a channel on which subscription JSON payloads can be received.
pub async fn test_subscription(&self, method: &str, params: impl ToRpcParams) -> TestSubscription {
let params = params.to_rpc_params().expect("valid JSON-RPC params");
tracing::trace!("[Methods::test_subscription] Calling subscription method: {:?}, params: {:?}", method, params);
let req =
Request { jsonrpc: TwoPointZero, id: Id::Number(0), method: Cow::borrowed(method), params: Some(&params) };
let req = Request::new(method.into(), Some(&params), Id::Number(0));

let (tx, mut rx) = mpsc::unbounded();
let sink = MethodSink::new(tx.clone());
Expand Down Expand Up @@ -703,11 +749,10 @@ impl SubscriptionSink {
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, Error> {
serde_json::to_string(&SubscriptionResponse {
jsonrpc: TwoPointZero,
method: self.method.into(),
params: SubscriptionPayload { subscription: RpcSubscriptionId::Num(self.uniq_sub.sub_id), result },
})
serde_json::to_string(&SubscriptionResponse::new(
self.method.into(),
SubscriptionPayload { subscription: RpcSubscriptionId::Num(self.uniq_sub.sub_id), result },
))
.map_err(Into::into)
}

Expand Down

0 comments on commit f8a412e

Please sign in to comment.