Skip to content

Commit

Permalink
ws client redirections (#397)
Browse files Browse the repository at this point in the history
* feat(ws client): support redirections

* reuse socket

* reuse socket

* add hacks

* fix build

* remove hacks

* fix bad merge

* address grumbles

* fix grumbles

* fix grumbles

* fix nit

* add redirection test

* Update test-utils/src/types.rs

* Resolved todo

* Check that redirected client actually works

* Rename test-utils "types" to "mocks"

* Fix windows test (?)

* fmt

* What is wrong with you windows?

* Ignore redirect test on windows

* fix bad transport errors

* debug windows tests

* update soketto

* maybe fix windows test

* add config flag for max redirections

* revert faulty change.

Relative reference must start with either `/` or `//`

* revert windows path

* use manual join paths

* remove url dep

* Update ws-client/src/tests.rs

* default max redirects 5

* remove needless clone vec

* fix bad merge

* cmon CI run

Co-authored-by: David Palm <dvdplm@gmail.com>
  • Loading branch information
niklasad1 and dvdplm authored Oct 5, 2021
1 parent bda8efe commit 94c881b
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 133 deletions.
2 changes: 1 addition & 1 deletion http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::types::{
};
use crate::HttpClientBuilder;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::error::{CallError, Error};
use crate::{server::StopHandle, HttpServerBuilder, RpcModule};

use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, StatusCode, TestContext};
use jsonrpsee_test_utils::mocks::{Id, StatusCode, TestContext};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;
use tokio::task::JoinHandle;
Expand Down
2 changes: 1 addition & 1 deletion test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ hyper = { version = "0.14.10", features = ["full"] }
log = "0.4"
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = "1"
soketto = "0.7"
soketto = { version = "0.7", features = ["http"] }
tokio = { version = "1", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.6", features = ["compat"] }
2 changes: 1 addition & 1 deletion test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::types::{Body, HttpResponse, Id, Uri};
use crate::mocks::{Body, HttpResponse, Id, Uri};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Request, Response, Server};
use serde_json::Value;
Expand Down
2 changes: 1 addition & 1 deletion test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{future::Future, time::Duration};
use tokio::time::{timeout, Timeout};

pub mod helpers;
pub mod types;
pub mod mocks;

/// Helper extension trait which allows to limit execution time for the futures.
/// It is helpful in tests to ensure that no future will ever get stuck forever.
Expand Down
63 changes: 59 additions & 4 deletions test-utils/src/types.rs → test-utils/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ use futures_util::{
stream::{self, StreamExt},
};
use serde::{Deserialize, Serialize};
use soketto::handshake::{self, server::Response, Error as SokettoError, Server};
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server};
use std::{io, net::SocketAddr, time::Duration};
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

Expand Down Expand Up @@ -314,3 +312,60 @@ async fn connection_task(socket: tokio::net::TcpStream, mode: ServerMode, mut ex
}
}
}

// Run a WebSocket server running on localhost that redirects requests for testing.
// Requests to any url except for `/myblock/two` will redirect one or two times (HTTP 301) and eventually end up in `/myblock/two`.
pub fn ws_server_with_redirect(other_server: String) -> String {
let addr = ([127, 0, 0, 1], 0).into();

let service = hyper::service::make_service_fn(move |_| {
let other_server = other_server.clone();
async move {
Ok::<_, hyper::Error>(hyper::service::service_fn(move |req| {
let other_server = other_server.clone();
async move { handler(req, other_server).await }
}))
}
});
let server = hyper::Server::bind(&addr).serve(service);
let addr = server.local_addr();

tokio::spawn(async move { server.await });
format!("ws://{}", addr)
}

/// Handle incoming HTTP Requests.
async fn handler(
req: hyper::Request<Body>,
other_server: String,
) -> Result<hyper::Response<Body>, soketto::BoxedError> {
if is_upgrade_request(&req) {
log::debug!("{:?}", req);

match req.uri().path() {
"/myblock/two" => {
let response = hyper::Response::builder()
.status(301)
.header("Location", other_server)
.body(Body::empty())
.unwrap();
Ok(response)
}
"/myblock/one" => {
let response =
hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap();
Ok(response)
}
_ => {
let response = hyper::Response::builder()
.status(301)
.header("Location", "/myblock/one")
.body(Body::empty())
.unwrap();
Ok(response)
}
}
} else {
panic!("expect upgrade to WS");
}
}
7 changes: 4 additions & 3 deletions ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ arrayvec = "0.7.1"
async-trait = "0.1"
fnv = "1"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
http = "0.2"
jsonrpsee-types = { path = "../types", version = "0.3.0" }
log = "0.4"
pin-project = "1"
rustls-native-certs = "0.5.0"
serde = "1"
serde_json = "1"
soketto = "0.7"
pin-project = "1"
thiserror = "1"
url = "2"
rustls-native-certs = "0.5.0"

[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
env_logger = "0.9"
33 changes: 21 additions & 12 deletions ws-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{Receiver as WsReceiver, Sender as WsSender, Target, WsTransportClientBuilder};
use crate::transport::{Receiver as WsReceiver, Sender as WsSender, WsHandshakeError, WsTransportClientBuilder};
use crate::types::{
traits::{Client, SubscriptionClient},
v2::{Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, RpcError, SubscriptionResponse},
Expand All @@ -46,10 +46,13 @@ use futures::{
prelude::*,
sink::SinkExt,
};
use http::uri::{InvalidUri, Uri};
use tokio::sync::Mutex;

use serde::de::DeserializeOwned;
use std::{borrow::Cow, time::Duration};
use std::{borrow::Cow, convert::TryInto, time::Duration};

pub use soketto::handshake::client::Header;

/// Wrapper over a [`oneshot::Receiver`](futures::channel::oneshot::Receiver) that reads
/// the underlying channel once and then stores the result in String.
Expand Down Expand Up @@ -109,6 +112,7 @@ pub struct WsClientBuilder<'a> {
origin_header: Option<Cow<'a, str>>,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_redirections: usize,
}

impl<'a> Default for WsClientBuilder<'a> {
Expand All @@ -121,6 +125,7 @@ impl<'a> Default for WsClientBuilder<'a> {
origin_header: None,
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_redirections: 5,
}
}
}
Expand Down Expand Up @@ -151,8 +156,8 @@ impl<'a> WsClientBuilder<'a> {
}

/// Set origin header to pass during the handshake.
pub fn origin_header(mut self, origin: &'a str) -> Self {
self.origin_header = Some(Cow::Borrowed(origin));
pub fn origin_header(mut self, origin: Cow<'a, str>) -> Self {
self.origin_header = Some(origin);
self
}

Expand All @@ -176,31 +181,35 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// Set the max number of redirections to perform until a connection is regarded as failed.
pub fn max_redirections(mut self, redirect: usize) -> Self {
self.max_redirections = redirect;
self
}

/// Build the client with specified URL to connect to.
/// If the port number is missing from the URL, the default port number is used.
///
///
/// `ws://host` - port 80 is used
///
/// `wss://host` - port 443 is used
/// You must provide the port number in the URL.
///
/// ## Panics
///
/// Panics if being called outside of `tokio` runtime context.
pub async fn build(self, url: &'a str) -> Result<WsClient, Error> {
pub async fn build(self, uri: &'a str) -> Result<WsClient, Error> {
let certificate_store = self.certificate_store;
let max_capacity_per_subscription = self.max_notifs_per_subscription;
let max_concurrent_requests = self.max_concurrent_requests;
let request_timeout = self.request_timeout;
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();

let uri: Uri = uri.parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?;

let builder = WsTransportClientBuilder {
certificate_store,
target: Target::parse(url).map_err(|e| Error::Transport(e.into()))?,
target: uri.try_into().map_err(|e: WsHandshakeError| Error::Transport(e.into()))?,
timeout: self.connection_timeout,
origin_header: self.origin_header,
max_request_body_size: self.max_request_body_size,
max_redirections: self.max_redirections,
};

let (sender, receiver) = builder.build().await.map_err(|e| Error::Transport(e.into()))?;
Expand Down
33 changes: 32 additions & 1 deletion ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::{
};
use crate::WsClientBuilder;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -263,3 +263,34 @@ fn assert_error_response(err: Error, exp: ErrorObject) {
e => panic!("Expected error: \"{}\", got: {:?}", err, e),
};
}

#[tokio::test]
async fn redirections() {
let _ = env_logger::try_init();
let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(expected.into(), Id::Num(0)),
)
.with_default_timeout()
.await
.unwrap();

let server_url = format!("ws://{}", server.local_addr());
let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url);

// The client will first connect to a server that only performs re-directions and finally
// redirect to another server to complete the handshake.
let client = WsClientBuilder::default().build(&redirect_url).with_default_timeout().await;
// It's an ok client
let client = match client {
Ok(Ok(client)) => client,
Ok(Err(e)) => panic!("WsClient builder failed with: {:?}", e),
Err(e) => panic!("WsClient builder timed out with: {:?}", e),
};
// It's connected
assert!(client.is_connected());
// It works
let response = client.request::<String>("anything", ParamsSer::NoParams).with_default_timeout().await.unwrap();
assert_eq!(response.unwrap(), String::from(expected));
}
Loading

0 comments on commit 94c881b

Please sign in to comment.