Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jsonrpsee types]: unify a couple of types + more tests #389

Merged
merged 2 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use jsonrpsee_utils::server::{
rpc_module::Methods,
};

use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
cmp,
Expand Down Expand Up @@ -196,7 +197,8 @@ impl Server {
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
methods.execute(&tx, req, 0).await;
} else if let Ok(_req) = serde_json::from_slice::<JsonRpcNotification>(&body) {
} else if let Ok(_req) = serde_json::from_slice::<JsonRpcNotification<Option<&RawValue>>>(&body)
{
return Ok::<_, HyperError>(response::ok_response("".into()));
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
Expand All @@ -207,7 +209,9 @@ impl Server {
} else {
send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<JsonRpcNotification>>(&body) {
} else if let Ok(_batch) =
serde_json::from_slice::<Vec<JsonRpcNotification<Option<&RawValue>>>>(&body)
{
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
log::error!(
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,5 @@ async fn proc_macros_generic_http_client_api() {

assert_eq!(Test::<String>::say_hello(&client).await.unwrap(), "hello".to_string());
assert_eq!(Test2::<u16, String>::foo(&client, 99_u16).await.unwrap(), "hello".to_string());
// TODO: https://github.com/paritytech/jsonrpsee/issues/212
//assert!(Registrar::register_para(&client, 99, "para").await.is_ok());
assert!(Registrar::register_para(&client, 99, "para").await.is_ok());
}
39 changes: 26 additions & 13 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,12 @@ use beef::Cow;
use serde::de::{self, Deserializer, Unexpected, Visitor};
use serde::ser::Serializer;
use serde::{Deserialize, Serialize};
use serde_json::{value::RawValue, Value as JsonValue};
use serde_json::Value as JsonValue;
use std::fmt;

/// JSON-RPC parameter values for subscriptions.
#[derive(Serialize, Deserialize, Debug)]
pub struct JsonRpcNotificationParams<'a> {
/// Subscription ID
pub subscription: u64,
/// Result.
#[serde(borrow)]
pub result: &'a RawValue,
}

/// JSON-RPC parameter values for subscriptions with support for number and strings.
#[derive(Deserialize, Debug)]
pub struct JsonRpcNotificationParamsAlloc<T> {
pub struct JsonRpcSubscriptionParams<T> {
/// Subscription ID
pub subscription: SubscriptionId,
/// Result.
Expand Down Expand Up @@ -245,7 +235,9 @@ impl<'a> From<Id<'a>> for OwnedId {

#[cfg(test)]
mod test {
use super::{Cow, Id, JsonRpcParams, JsonValue, RpcParams, SubscriptionId, TwoPointZero};
use super::{
Cow, Id, JsonRpcParams, JsonRpcSubscriptionParams, JsonValue, RpcParams, SubscriptionId, TwoPointZero,
};

#[test]
fn id_deserialization() {
Expand Down Expand Up @@ -335,4 +327,25 @@ mod test {
assert_eq!(&serialized, initial_ser);
}
}

#[test]
fn subscription_params_serialize_work() {
let ser =
serde_json::to_string(&JsonRpcSubscriptionParams { subscription: SubscriptionId::Num(12), result: "goal" })
.unwrap();
let exp = r#"{"subscription":12,"result":"goal"}"#;
assert_eq!(ser, exp);
}

#[test]
fn subscription_params_deserialize_work() {
let ser = r#"{"subscription":"9","result":"offside"}"#;
assert!(
serde_json::from_str::<JsonRpcSubscriptionParams<()>>(ser).is_err(),
"invalid type should not be deserializable"
);
let dsr: JsonRpcSubscriptionParams<JsonValue> = serde_json::from_str(ser).unwrap();
assert_eq!(dsr.subscription, SubscriptionId::Str("9".into()));
assert_eq!(dsr.result, serde_json::json!("offside"));
}
}
7 changes: 3 additions & 4 deletions types/src/v2/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ pub struct JsonRpcInvalidRequest<'a> {
/// JSON-RPC notification (a request object without a request ID).
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct JsonRpcNotification<'a> {
pub struct JsonRpcNotification<'a, T> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Name of the method to be invoked.
pub method: &'a str,
/// Parameter values of the request.
#[serde(borrow)]
pub params: Option<&'a RawValue>,
pub params: T,
}

/// Serializable [JSON-RPC object](https://www.jsonrpc.org/specification#request-object)
Expand Down Expand Up @@ -116,7 +115,7 @@ mod test {
#[test]
fn deserialize_valid_notif_works() {
let ser = r#"{"jsonrpc":"2.0","method":"say_hello","params":[]}"#;
let dsr: JsonRpcNotification = serde_json::from_str(ser).unwrap();
let dsr: JsonRpcNotification<&RawValue> = serde_json::from_str(ser).unwrap();
assert_eq!(dsr.method, "say_hello");
assert_eq!(dsr.jsonrpc, TwoPointZero);
}
Expand Down
51 changes: 19 additions & 32 deletions types/src/v2/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::v2::params::{Id, JsonRpcNotificationParams, JsonRpcNotificationParamsAlloc, TwoPointZero};
use crate::v2::params::{Id, TwoPointZero};
use serde::{Deserialize, Serialize};

/// JSON-RPC successful response object.
Expand All @@ -14,37 +14,24 @@ pub struct JsonRpcResponse<'a, T> {
pub id: Id<'a>,
}

/// JSON-RPC subscription response.
#[derive(Serialize, Debug)]
pub struct JsonRpcSubscriptionResponse<'a> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Method
pub method: &'a str,
/// Params.
pub params: JsonRpcNotificationParams<'a>,
}
#[cfg(test)]
mod tests {
use super::{Id, JsonRpcResponse, TwoPointZero};

/// JSON-RPC subscription response.
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct JsonRpcSubscriptionResponseAlloc<'a, T> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Method
pub method: &'a str,
/// Params.
pub params: JsonRpcNotificationParamsAlloc<T>,
}
#[test]
fn serialize_call_response() {
let ser =
serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, result: "ok", id: Id::Number(1) }).unwrap();
let exp = r#"{"jsonrpc":"2.0","result":"ok","id":1}"#;
assert_eq!(ser, exp);
}

/// JSON-RPC notification response.
#[derive(Deserialize, Serialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct JsonRpcNotifResponse<'a, T> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Method
pub method: &'a str,
/// Params.
pub params: T,
#[test]
fn deserialize_call() {
let exp = JsonRpcResponse { jsonrpc: TwoPointZero, result: 99_u64, id: Id::Number(11) };
let dsr: JsonRpcResponse<u64> = serde_json::from_str(r#"{"jsonrpc":"2.0", "result":99, "id":11}"#).unwrap();
assert_eq!(dsr.jsonrpc, exp.jsonrpc);
assert_eq!(dsr.result, exp.result);
assert_eq!(dsr.id, exp.id);
}
}
37 changes: 17 additions & 20 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt};
use jsonrpsee_types::error::{CallError, Error, SubscriptionClosedError};
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::v2::params::{Id, JsonRpcNotificationParams, OwnedId, OwnedRpcParams, RpcParams, TwoPointZero};
use jsonrpsee_types::v2::request::JsonRpcRequest;
use jsonrpsee_types::v2::response::JsonRpcSubscriptionResponse;
use jsonrpsee_types::v2::params::{
Id, JsonRpcSubscriptionParams, OwnedId, OwnedRpcParams, RpcParams, SubscriptionId as JsonRpcSubscriptionId,
TwoPointZero,
};
use jsonrpsee_types::v2::request::{JsonRpcNotification, JsonRpcRequest};

use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::{to_raw_value, RawValue};
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -367,18 +368,20 @@ pub struct SubscriptionSink {
impl SubscriptionSink {
/// Send message on this subscription.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<(), Error> {
let result = to_raw_value(result)?;
self.send_raw_value(&result)
let msg = self.build_message(result)?;
self.inner_send(msg).map_err(Into::into)
}

fn send_raw_value(&mut self, result: &RawValue) -> Result<(), Error> {
let msg = serde_json::to_string(&JsonRpcSubscriptionResponse {
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, Error> {
serde_json::to_string(&JsonRpcNotification {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: self.uniq_sub.sub_id, result: &*result },
})?;

self.inner_send(msg).map_err(Into::into)
params: JsonRpcSubscriptionParams {
subscription: JsonRpcSubscriptionId::Num(self.uniq_sub.sub_id),
result,
},
})
.map_err(Into::into)
}

fn inner_send(&mut self, msg: String) -> Result<(), Error> {
Expand All @@ -404,14 +407,8 @@ impl SubscriptionSink {
pub fn close(&mut self, close_reason: String) {
self.is_connected.take();
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
let result =
to_raw_value(&SubscriptionClosedError::from(close_reason)).expect("valid json infallible; qed");
let msg = serde_json::to_string(&JsonRpcSubscriptionResponse {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: self.uniq_sub.sub_id, result: &*result },
})
.expect("valid json infallible; qed");
let msg =
self.build_message(&SubscriptionClosedError::from(close_reason)).expect("valid json infallible; qed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats qed?

Copy link
Member Author

@niklasad1 niklasad1 Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://en.wikipedia.org/wiki/Q.E.D.

basically, a hand-wavy proof that a given unwrap/expect is infallible otherwise it would crash the thread that executed the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's latin, old-europe snobbish way of saying "tada!"

let _ = sink.unbounded_send(msg);
}
}
Expand Down
11 changes: 7 additions & 4 deletions ws-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::traits::{Client, SubscriptionClient};
use crate::transport::{Receiver as WsReceiver, Sender as WsSender, Target, WsTransportClientBuilder};
use crate::v2::error::JsonRpcError;
use crate::v2::params::{Id, JsonRpcParams};
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotificationSer};
use crate::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponseAlloc};
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotification, JsonRpcNotificationSer};
use crate::v2::response::JsonRpcResponse;
use crate::TEN_MB_SIZE_BYTES;
use crate::{
helpers::{
Expand All @@ -51,6 +51,7 @@ use futures::{
sink::SinkExt,
};

use jsonrpsee_types::v2::params::JsonRpcSubscriptionParams;
use jsonrpsee_types::SubscriptionKind;
use serde::de::DeserializeOwned;
use std::{
Expand Down Expand Up @@ -621,14 +622,16 @@ async fn background_task(
}
}
// Subscription response.
else if let Ok(notif) = serde_json::from_slice::<JsonRpcSubscriptionResponseAlloc<_>>(&raw) {
else if let Ok(notif) =
serde_json::from_slice::<JsonRpcNotification<JsonRpcSubscriptionParams<_>>>(&raw)
{
log::debug!("[backend]: recv subscription {:?}", notif);
if let Err(Some(unsub)) = process_subscription_response(&mut manager, notif) {
let _ = stop_subscription(&mut sender, &mut manager, unsub).await;
}
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<JsonRpcNotifResponse<_>>(&raw) {
else if let Ok(notif) = serde_json::from_slice::<JsonRpcNotification<_>>(&raw) {
log::debug!("[backend]: recv notification {:?}", notif);
let _ = process_notification(&mut manager, notif);
}
Expand Down
10 changes: 5 additions & 5 deletions ws-client/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::manager::{RequestManager, RequestStatus};
use crate::transport::Sender as WsSender;
use futures::channel::mpsc;
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, SubscriptionId};
use jsonrpsee_types::v2::request::JsonRpcCallSer;
use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponseAlloc};
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, JsonRpcSubscriptionParams, SubscriptionId};
use jsonrpsee_types::v2::request::{JsonRpcCallSer, JsonRpcNotification};
use jsonrpsee_types::v2::response::JsonRpcResponse;
use jsonrpsee_types::{v2::error::JsonRpcError, Error, RequestMessage};
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn process_batch_response(manager: &mut RequestManager, rps: Vec<JsonRpcResp
/// Returns `Err(Some(msg))` if the subscription was full.
pub fn process_subscription_response(
manager: &mut RequestManager,
notif: JsonRpcSubscriptionResponseAlloc<JsonValue>,
notif: JsonRpcNotification<JsonRpcSubscriptionParams<JsonValue>>,
) -> Result<(), Option<RequestMessage>> {
let sub_id = notif.params.subscription;
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn process_subscription_response(
///
/// Returns Ok() if the response was successfully handled
/// Returns Err() if there was no handler for the method
pub fn process_notification(manager: &mut RequestManager, notif: JsonRpcNotifResponse<JsonValue>) -> Result<(), Error> {
pub fn process_notification(manager: &mut RequestManager, notif: JsonRpcNotification<JsonValue>) -> Result<(), Error> {
match manager.as_notification_handler_mut(notif.method.to_owned()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
Ok(()) => Ok(()),
Expand Down