Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): support request id as Strings. #659

Merged
merged 7 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});

crit.bench_function("jsonrpsee_types_v2_vec", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
Expand Down
34 changes: 21 additions & 13 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::time::Duration;
use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
Expand All @@ -42,6 +42,7 @@ pub struct HttpClientBuilder {
request_timeout: Duration,
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
}

impl HttpClientBuilder {
Expand Down Expand Up @@ -69,13 +70,19 @@ impl HttpClientBuilder {
self
}

/// Configure the data type of the request object ID (default is number).
pub fn id_format(mut self, id_kind: IdKind) -> Self {
self.id_kind = id_kind;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests)),
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
request_timeout: self.request_timeout,
})
}
Expand All @@ -88,6 +95,7 @@ impl Default for HttpClientBuilder {
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
}
}
}
Expand Down Expand Up @@ -120,8 +128,9 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned,
{
let id = self.id_manager.next_request_id()?;
let request = RequestSer::new(Id::Number(*id.inner()), method, params);
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);

let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Expand All @@ -142,9 +151,7 @@ impl ClientT for HttpClient {
}
};

let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;

if response_id == *id.inner() {
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
Expand All @@ -155,16 +162,18 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned + Default + Clone,
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();

let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());

let ids = self.id_manager.next_request_ids(batch.len())?;
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(Id::Number(ids.inner()[pos]), method, params));
ordered_requests.push(ids.inner()[pos]);
request_set.insert(ids.inner()[pos], pos);
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
Expand All @@ -184,8 +193,7 @@ impl ClientT for HttpClient {
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let pos = match request_set.get(&response_id) {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
Expand Down
33 changes: 29 additions & 4 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::HttpClientBuilder;
use jsonrpsee_core::client::ClientT;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::value::Value as JsonValue;

#[tokio::test]
async fn method_call_works() {
Expand All @@ -42,7 +41,33 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}

#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server_addr =
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
}

#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server_addr = http_server_with_hardcoded_response(ok_response(exp.into(), Id::Str("0".into())))
.with_default_timeout()
.await
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

#[tokio::test]
Expand Down Expand Up @@ -139,7 +164,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
Expand Down
11 changes: 10 additions & 1 deletion client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use jsonrpsee_types as types;
use std::time::Duration;

use jsonrpsee_client_transport::ws::{Header, InvalidUri, Uri, WsTransportClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder, IdKind};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};

/// Builder for [`WsClient`].
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct WsClientBuilder<'a> {
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_redirections: usize,
id_kind: IdKind,
}

impl<'a> Default for WsClientBuilder<'a> {
Expand All @@ -90,6 +91,7 @@ impl<'a> Default for WsClientBuilder<'a> {
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_redirections: 5,
id_kind: IdKind::Number,
}
}
}
Expand Down Expand Up @@ -143,6 +145,12 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// See documentation for [`ClientBuilder::id_format`] (default is Number).
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}

/// Build the client with specified URL to connect to.
/// You must provide the port number in the URL.
///
Expand All @@ -165,6 +173,7 @@ impl<'a> WsClientBuilder<'a> {
.max_notifs_per_subscription(self.max_notifs_per_subscription)
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build(sender, receiver))
}
}
41 changes: 38 additions & 3 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::WsClientBuilder;
use jsonrpsee_core::client::Subscription;
use jsonrpsee_core::client::{ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
Expand All @@ -44,7 +44,42 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}

#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Num(0)),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();

let err = client.request::<String>("o", None).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
}

#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Str("0".into())),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}

#[tokio::test]
Expand Down Expand Up @@ -237,7 +272,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), response)
.with_default_timeout()
.await
Expand Down
16 changes: 8 additions & 8 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub(crate) fn process_batch_response(manager: &mut RequestManager, rps: Vec<Resp
let mut rps_unordered: Vec<_> = Vec::with_capacity(rps.len());

for rp in rps {
let id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
digest.push(id);
let id = rp.id.into_owned();
digest.push(id.clone());
rps_unordered.push((id, rp.result));
}

Expand Down Expand Up @@ -131,7 +131,7 @@ pub(crate) fn process_single_response(
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let response_id = response.id.into_owned();
match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
Expand All @@ -144,7 +144,7 @@ pub(crate) fn process_single_response(
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id).ok_or(Error::InvalidRequestId)?;
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;

let sub_id: Result<SubscriptionId, _> = response.result.try_into();
let sub_id = match sub_id {
Expand All @@ -157,7 +157,7 @@ pub(crate) fn process_single_response(

let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
if manager
.insert_subscription(response_id, unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.is_ok()
{
match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
Expand Down Expand Up @@ -191,14 +191,14 @@ pub(crate) async fn stop_subscription(
/// Builds an unsubscription message.
pub(crate) fn build_unsubscribe_message(
manager: &mut RequestManager,
sub_req_id: u64,
sub_req_id: Id<'static>,
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let sub_id_slice: &[JsonValue] = &[sub_id.into()];
// TODO: https://github.com/paritytech/jsonrpsee/issues/275
let params = ParamsSer::ArrayRef(sub_id_slice);
let raw = serde_json::to_string(&RequestSer::new(Id::Number(unsub_req_id), &unsub, Some(params))).ok()?;
let raw = serde_json::to_string(&RequestSer::new(&unsub_req_id, &unsub, Some(params))).ok()?;
Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
}

Expand All @@ -207,7 +207,7 @@ pub(crate) fn build_unsubscribe_message(
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let id = err.id.clone().into_owned();
match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
Expand Down
Loading