Skip to content

Commit

Permalink
feat(client): support request id as Strings. (#659)
Browse files Browse the repository at this point in the history
* feat(client): support request id as Strings.

* add tests for Id::String

* address grumbles: move id_kind to RequestManager

* Update client/http-client/src/client.rs

* types: take ref to `ID` get rid of some `Clone`

* remove more clone

* grumbles: rename tests
  • Loading branch information
niklasad1 authored Jan 21, 2022
1 parent c0f343d commit 708d421
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 115 deletions.
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});

crit.bench_function("jsonrpsee_types_v2_vec", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
Expand Down
34 changes: 21 additions & 13 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::time::Duration;
use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
Expand All @@ -42,6 +42,7 @@ pub struct HttpClientBuilder {
request_timeout: Duration,
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
}

impl HttpClientBuilder {
Expand Down Expand Up @@ -69,13 +70,19 @@ impl HttpClientBuilder {
self
}

/// Configure the data type of the request object ID (default is number).
pub fn id_format(mut self, id_kind: IdKind) -> Self {
self.id_kind = id_kind;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests)),
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
request_timeout: self.request_timeout,
})
}
Expand All @@ -88,6 +95,7 @@ impl Default for HttpClientBuilder {
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
}
}
}
Expand Down Expand Up @@ -120,8 +128,9 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned,
{
let id = self.id_manager.next_request_id()?;
let request = RequestSer::new(Id::Number(*id.inner()), method, params);
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);

let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Expand All @@ -142,9 +151,7 @@ impl ClientT for HttpClient {
}
};

let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;

if response_id == *id.inner() {
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
Expand All @@ -155,16 +162,18 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned + Default + Clone,
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();

let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());

let ids = self.id_manager.next_request_ids(batch.len())?;
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(Id::Number(ids.inner()[pos]), method, params));
ordered_requests.push(ids.inner()[pos]);
request_set.insert(ids.inner()[pos], pos);
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
Expand All @@ -184,8 +193,7 @@ impl ClientT for HttpClient {
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let pos = match request_set.get(&response_id) {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
Expand Down
33 changes: 29 additions & 4 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::HttpClientBuilder;
use jsonrpsee_core::client::ClientT;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::value::Value as JsonValue;

#[tokio::test]
async fn method_call_works() {
Expand All @@ -42,7 +41,33 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}

#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server_addr =
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
}

#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server_addr = http_server_with_hardcoded_response(ok_response(exp.into(), Id::Str("0".into())))
.with_default_timeout()
.await
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

#[tokio::test]
Expand Down Expand Up @@ -139,7 +164,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
Expand Down
11 changes: 10 additions & 1 deletion client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use jsonrpsee_types as types;
use std::time::Duration;

use jsonrpsee_client_transport::ws::{Header, InvalidUri, Uri, WsTransportClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder, IdKind};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};

/// Builder for [`WsClient`].
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct WsClientBuilder<'a> {
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_redirections: usize,
id_kind: IdKind,
}

impl<'a> Default for WsClientBuilder<'a> {
Expand All @@ -90,6 +91,7 @@ impl<'a> Default for WsClientBuilder<'a> {
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_redirections: 5,
id_kind: IdKind::Number,
}
}
}
Expand Down Expand Up @@ -143,6 +145,12 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// See documentation for [`ClientBuilder::id_format`] (default is Number).
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}

/// Build the client with specified URL to connect to.
/// You must provide the port number in the URL.
///
Expand All @@ -165,6 +173,7 @@ impl<'a> WsClientBuilder<'a> {
.max_notifs_per_subscription(self.max_notifs_per_subscription)
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build(sender, receiver))
}
}
41 changes: 38 additions & 3 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::WsClientBuilder;
use jsonrpsee_core::client::Subscription;
use jsonrpsee_core::client::{ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
Expand All @@ -44,7 +44,42 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}

#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Num(0)),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();

let err = client.request::<String>("o", None).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
}

#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Str("0".into())),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

#[tokio::test]
Expand Down Expand Up @@ -237,7 +272,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), response)
.with_default_timeout()
.await
Expand Down
16 changes: 8 additions & 8 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub(crate) fn process_batch_response(manager: &mut RequestManager, rps: Vec<Resp
let mut rps_unordered: Vec<_> = Vec::with_capacity(rps.len());

for rp in rps {
let id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
digest.push(id);
let id = rp.id.into_owned();
digest.push(id.clone());
rps_unordered.push((id, rp.result));
}

Expand Down Expand Up @@ -131,7 +131,7 @@ pub(crate) fn process_single_response(
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let response_id = response.id.into_owned();
match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
Expand All @@ -144,7 +144,7 @@ pub(crate) fn process_single_response(
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id).ok_or(Error::InvalidRequestId)?;
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;

let sub_id: Result<SubscriptionId, _> = response.result.try_into();
let sub_id = match sub_id {
Expand All @@ -157,7 +157,7 @@ pub(crate) fn process_single_response(

let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
if manager
.insert_subscription(response_id, unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.is_ok()
{
match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
Expand Down Expand Up @@ -191,14 +191,14 @@ pub(crate) async fn stop_subscription(
/// Builds an unsubscription message.
pub(crate) fn build_unsubscribe_message(
manager: &mut RequestManager,
sub_req_id: u64,
sub_req_id: Id<'static>,
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let sub_id_slice: &[JsonValue] = &[sub_id.into()];
// TODO: https://github.com/paritytech/jsonrpsee/issues/275
let params = ParamsSer::ArrayRef(sub_id_slice);
let raw = serde_json::to_string(&RequestSer::new(Id::Number(unsub_req_id), &unsub, Some(params))).ok()?;
let raw = serde_json::to_string(&RequestSer::new(&unsub_req_id, &unsub, Some(params))).ok()?;
Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
}

Expand All @@ -207,7 +207,7 @@ pub(crate) fn build_unsubscribe_message(
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let id = err.id.clone().into_owned();
match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
Expand Down
Loading

0 comments on commit 708d421

Please sign in to comment.